"""General functions useful across different parts of the code."""
import copy
import json
import logging
import os
import tempfile
import time
import urllib.error
import urllib.request
from pathlib import Path
from urllib.parse import urlparse
import numpy as np
import yaml
__all__ = [
"change_dict_keys_case",
"collect_data_from_file_or_dict",
"collect_final_lines",
"collect_kwargs",
"InvalidConfigDataError",
"get_log_level_from_user",
"remove_substring_recursively_from_dict",
"set_default_kwargs",
"get_log_excerpt",
"sort_arrays",
]
_logger = logging.getLogger(__name__)
[docs]
class InvalidConfigDataError(Exception):
"""Exception for invalid configuration data."""
def join_url_or_path(url_or_path, *args):
"""
Join URL or path with additional subdirectories and file.
This is the equivalent to Path.join(), with extended functionality
working also for URLs.
Parameters
----------
url_or_path: str or Path
URL or path to be extended.
args: list
Additional arguments to be added to the URL or path.
Returns
-------
str or Path
Extended URL or path.
"""
if "://" in str(url_or_path):
return "/".join([url_or_path.rstrip("/"), *args])
return Path(url_or_path).joinpath(*args)
def is_url(url):
"""
Check if a string is a valid URL.
Parameters
----------
url: str
String to be checked.
Returns
-------
bool
True if url is a valid URL.
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except AttributeError:
return False
def collect_data_from_http(url):
"""
Download yaml or json file from url and return it contents as dict.
File is downloaded as a temporary file and deleted afterwards.
Parameters
----------
url: str
URL of the yaml/json file.
Returns
-------
dict
Dictionary containing the file content.
Raises
------
TypeError
If url is not a valid URL.
FileNotFoundError
If downloading the yaml file fails.
"""
try:
with tempfile.NamedTemporaryFile(mode="w+t") as tmp_file:
urllib.request.urlretrieve(url, tmp_file.name)
if url.endswith("yml") or url.endswith("yaml"):
try:
data = yaml.safe_load(tmp_file)
except yaml.constructor.ConstructorError:
data = _load_yaml_using_astropy(tmp_file)
elif url.endswith("json"):
data = json.load(tmp_file)
elif url.endswith("list"):
lines = tmp_file.readlines()
data = [line.strip() for line in lines]
else:
msg = f"File extension of {url} not supported (should be json or yaml)"
_logger.error(msg)
raise TypeError(msg)
except TypeError as exc:
msg = "Invalid url {url}"
_logger.error(msg)
raise TypeError(msg) from exc
except urllib.error.HTTPError as exc:
msg = f"Failed to download file from {url}"
_logger.error(msg)
raise FileNotFoundError(msg) from exc
_logger.debug(f"Downloaded file from {url}")
return data
[docs]
def collect_data_from_file_or_dict(file_name, in_dict, allow_empty=False):
"""
Collect input data from file or dictionary.
Parameters
----------
file_name: str
Name of the yaml/json/ascii file.
in_dict: dict
Data as dict.
allow_empty: bool
If True, an error won't be raised in case both file_name and dict are None.
Returns
-------
data: dict or list
Data as dict or list.
Raises
------
AttributeError
If no input has been provided (neither by file, nor by dict).
"""
if file_name is not None:
return collect_data_from_file(file_name, in_dict)
if in_dict is not None:
return dict(in_dict)
if allow_empty:
_logger.debug("Input has not been provided (neither by file, nor by dict)")
return None
msg = "Input has not been provided (neither by file, nor by dict)"
_logger.debug(msg)
raise AttributeError(msg)
def collect_data_from_file(file_name, in_dict):
"""
Collect data from file based on its extension.
Parameters
----------
file_name: str
Name of the yaml/json/ascii file.
in_dict: dict
Data as dict.
Returns
-------
data: dict or list
Data as dict or list.
"""
if in_dict is not None:
_logger.warning("Both in_dict and file_name were given - file_name will be used")
if is_url(file_name):
return collect_data_from_http(file_name)
with open(file_name, encoding="utf-8") as file:
if Path(file_name).suffix.lower() == ".json":
return json.load(file)
if Path(file_name).suffix.lower() == ".list":
lines = file.readlines()
return [line.strip() for line in lines]
try:
return yaml.safe_load(file)
except yaml.constructor.ConstructorError:
return _load_yaml_using_astropy(file)
[docs]
def collect_kwargs(label, in_kwargs):
"""
Collect kwargs of the type label_* and return them as a dict.
Parameters
----------
label: str
Label to be collected in kwargs.
in_kwargs: dict
kwargs.
Returns
-------
dict
Dictionary with the collected kwargs.
"""
out_kwargs = {}
for key, value in in_kwargs.items():
if label + "_" in key:
out_kwargs[key.replace(label + "_", "")] = value
return out_kwargs
[docs]
def set_default_kwargs(in_kwargs, **kwargs):
"""
Fill in a dict with a set of default kwargs and return it.
Parameters
----------
in_kwargs: dict
Input dict to be filled in with the default kwargs.
**kwargs:
Default kwargs to be set.
Returns
-------
dict
Dictionary containing the default kwargs.
"""
for par, value in kwargs.items():
if par not in in_kwargs.keys():
in_kwargs[par] = value
return in_kwargs
[docs]
def collect_final_lines(file, n_lines):
"""
Collect final lines.
Parameters
----------
file: str or Path
File to collect the lines from.
n_lines: int
Number of lines to be collected.
Returns
-------
str
Final lines collected.
"""
list_of_lines = []
if Path(file).suffix == ".gz":
import gzip # pylint: disable=import-outside-toplevel
file_open_function = gzip.open
else:
file_open_function = open
with file_open_function(file, "rb") as read_obj:
# Move the cursor to the end of the file
read_obj.seek(0, os.SEEK_END)
# Create a buffer to keep the last read line
buffer = bytearray()
# Get the current position of pointer i.e eof
pointer_location = read_obj.tell()
# Loop till pointer reaches the top of the file
while pointer_location >= 0:
# Move the file pointer to the location pointed by pointer_location
read_obj.seek(pointer_location)
# Shift pointer location by -1
pointer_location = pointer_location - 1
# read that byte / character
new_byte = read_obj.read(1)
# If the read byte is new line character then it means one line is read
if new_byte == b"\n":
# Save the line in list of lines
list_of_lines.append(buffer.decode()[::-1])
# If the size of list reaches n_lines, then return the reversed list
if len(list_of_lines) == n_lines:
return "".join(list(reversed(list_of_lines)))
# Reinitialize the byte array to save next line
buffer = bytearray()
else:
# If last read character is not eol then add it in buffer
buffer.extend(new_byte)
# As file is read completely, if there is still data in buffer, then its first line.
if len(buffer) > 0:
list_of_lines.append(buffer.decode()[::-1])
return "".join(list(reversed(list_of_lines)))
[docs]
def get_log_level_from_user(log_level):
"""
Map between logging level from the user to logging levels of the logging module.
Parameters
----------
log_level: str
Log level from the user.
Returns
-------
logging.LEVEL
The requested logging level to be used as input to logging.setLevel().
"""
possible_levels = {
"info": logging.INFO,
"debug": logging.DEBUG,
"warn": logging.WARNING,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
try:
log_level_lower = log_level.lower()
except AttributeError:
log_level_lower = log_level
if log_level_lower not in possible_levels:
raise ValueError(
f"'{log_level}' is not a logging level, "
f"only possible ones are {list(possible_levels.keys())}"
)
return possible_levels[log_level_lower]
def copy_as_list(value):
"""
Copy value and, if it is not a list, turn it into a list with a single entry.
Parameters
----------
value single variable of any type or list
Returns
-------
value: list
Copy of value if it is a list of [value] otherwise.
"""
if isinstance(value, str):
return [value]
try:
return list(value)
except TypeError:
return [value]
def program_is_executable(program):
"""
Check if program exists and is executable.
Follows https://stackoverflow.com/questions/377017/
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
try:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
except KeyError:
_logger.debug("PATH environment variable is not set.")
return None
return None
def _search_directory(directory, filename, rec=False):
if not Path(directory).exists():
_logger.debug(f"Directory {directory} does not exist")
return None
file = Path(directory).joinpath(filename)
if file.exists():
_logger.debug(f"File {filename} found in {directory}")
return file
if rec:
for subdir in Path(directory).iterdir():
if subdir.is_dir():
file = _search_directory(subdir, filename, True)
if file:
return file
return None
def find_file(name, loc):
"""
Search for files inside of given directories, recursively, and return its full path.
Parameters
----------
name: str
File name to be searched for.
loc: Path or list of Path
Location of where to search for the file.
Returns
-------
Path
Full path of the file to be found if existing. Otherwise, None.
Raises
------
FileNotFoundError
If the desired file is not found.
"""
all_locations = [loc] if not isinstance(loc, list) else loc
# Searching file locally
file = _search_directory(".", name)
if file:
return file
# Searching file in given locations
for location in all_locations:
file = _search_directory(location, name, True)
if file:
return file
msg = f"File {name} could not be found in {all_locations}"
_logger.error(msg)
raise FileNotFoundError(msg)
[docs]
def get_log_excerpt(log_file, n_last_lines=30):
"""
Get an excerpt from a log file, namely the n_last_lines of the file.
Parameters
----------
log_file: str or Path
Log file to get the excerpt from.
n_last_lines: int
Number of last lines of the file to get.
Returns
-------
str
Excerpt from log file with header/footer
"""
return (
"\n\nRuntime error - See below the relevant part of the log/err file.\n\n"
f"{log_file}\n"
"====================================================================\n\n"
f"{collect_final_lines(log_file, n_last_lines)}\n\n"
"====================================================================\n"
)
def get_file_age(file_path):
"""Get the age of a file in seconds since the last modification."""
if not Path(file_path).is_file():
raise FileNotFoundError(f"'{file_path}' does not exist or is not a file.")
file_stats = os.stat(file_path)
modification_time = file_stats.st_mtime
current_time = time.time()
return (current_time - modification_time) / 60
def _process_dict_keys(input_dict, case_func):
"""
Process dictionary keys recursively.
Parameters
----------
input_dict: dict
Dictionary to be processed.
case_func: function
Function to change case of keys (e.g., str.lower, str.upper).
Returns
-------
dict
Processed dictionary with keys changed.
"""
output_dict = {}
for key, value in input_dict.items():
processed_key = case_func(key)
if isinstance(value, dict):
output_dict[processed_key] = _process_dict_keys(value, case_func)
elif isinstance(value, list):
processed_list = [
_process_dict_keys(item, case_func) if isinstance(item, dict) else item
for item in value
]
output_dict[processed_key] = processed_list
else:
output_dict[processed_key] = value
return output_dict
[docs]
def change_dict_keys_case(data_dict, lower_case=True):
"""
Change keys of a dictionary to lower or upper case recursively.
Parameters
----------
data_dict: dict
Dictionary to be converted.
lower_case: bool
Change keys to lower (upper) case if True (False).
Returns
-------
dict
Dictionary with keys converted to lower or upper case.
"""
# Determine which case function to use
case_func = str.lower if lower_case else str.upper
try:
return _process_dict_keys(data_dict, case_func)
except AttributeError as exc:
_logger.error(f"Input is not a proper dictionary: {data_dict}")
raise AttributeError from exc
[docs]
def remove_substring_recursively_from_dict(data_dict, substring="\n"):
"""
Remove substrings from all strings in a dictionary.
Recursively crawls through the dictionary This e.g., allows to remove all newline characters
from a dictionary.
Parameters
----------
data_dict: dict
Dictionary to be converted.
substring: str
Substring to be removed.
Raises
------
AttributeError:
if input is not a proper dictionary.
"""
try:
for key, value in data_dict.items():
if isinstance(value, str):
data_dict[key] = value.replace(substring, "")
elif isinstance(value, list):
modified_items = [
item.replace(substring, "") if isinstance(item, str) else item for item in value
]
modified_items = [
(
remove_substring_recursively_from_dict(item, substring)
if isinstance(item, dict)
else item
)
for item in modified_items
]
data_dict[key] = modified_items
elif isinstance(value, dict):
data_dict[key] = remove_substring_recursively_from_dict(value, substring)
except AttributeError:
_logger.debug(f"Input is not a dictionary: {data_dict}")
return data_dict
[docs]
def sort_arrays(*args):
"""Sort arrays.
Parameters
----------
*args
Arguments to be sorted.
Returns
-------
list
Sorted args.
"""
if len(args) == 0:
return args
order_array = copy.copy(args[0])
new_args = []
for arg in args:
_, value = zip(*sorted(zip(order_array, arg)))
new_args.append(list(value))
return new_args
def user_confirm():
"""
Ask the user to enter y or n (case-insensitive) on the command line.
Returns
-------
bool:
True if the answer is Y/y.
"""
while True:
try:
answer = input("Is this OK? [y/n]").lower()
return answer == "y"
except EOFError:
break
return False
def _get_value_dtype(value):
"""
Get the data type of the given value.
Parameters
----------
Value to determine the data type.
Returns
-------
type:
Data type of the value.
"""
if isinstance(value, (list | np.ndarray)):
value = np.array(value)
return value.dtype
return type(value)
def validate_data_type(reference_dtype, value=None, dtype=None, allow_subtypes=True):
"""
Validate data type of value or type object against a reference data type.
Allow to check for exact data type or allow subtypes (e.g. uint is accepted for int).
Take into account 'file' type as used in the model parameter database.
Parameters
----------
reference_dtype: str
Reference data type to be checked against.
value: any, optional
Value to be checked (if dtype is None).
dtype: type, optional
Type object to be checked (if value is None).
allow_subtypes: bool, optional
If True, allow subtypes to be accepted.
Returns
-------
bool:
True if the data type is valid.
"""
if value is None and dtype is None:
raise ValueError("Either value or dtype must be given.")
if value is not None and dtype is None:
dtype = _get_value_dtype(value)
# Strict comparison
if not allow_subtypes:
return np.issubdtype(dtype, reference_dtype)
# Allow any sub-type of integer or float for success
if (np.issubdtype(dtype, np.str_) or np.issubdtype(dtype, "object")) and reference_dtype in (
"string",
"str",
"file",
):
return True
if np.issubdtype(dtype, np.bool_) and reference_dtype in ("boolean", "bool"):
return True
if np.issubdtype(dtype, np.integer) and (
np.issubdtype(reference_dtype, np.integer) or np.issubdtype(reference_dtype, np.floating)
):
return True
if np.issubdtype(dtype, np.floating) and np.issubdtype(reference_dtype, np.floating):
return True
return False
def convert_list_to_string(data, comma_separated=False, shorten_list=False, collapse_list=False):
"""
Convert arrays to string (if required).
Parameters
----------
data: object
Object of data to convert (e.g., double or list)
comma_separated: bool
If True, returns elements as a comma-separated string (default is space-separated).
shorten_list: bool
If True and all elements in the list are identical, returns a summary string
like "all: value". This is useful to make the configuration files more readable.
collapse_list: bool
If True and all elements in the list are identical, returns a single value
instead of the entire list.
Returns
-------
object or str:
Converted data as string (if required)
"""
if data is None or not isinstance(data, list | np.ndarray):
return data
if shorten_list and len(data) > 10 and all(np.isclose(item, data[0]) for item in data):
return f"all: {data[0]}"
if collapse_list and len(sorted(set(data))) == 1:
data = [data[0]]
if comma_separated:
return ", ".join(str(item) for item in data)
return " ".join(str(item) for item in data)
def convert_string_to_list(data_string, is_float=True):
"""
Convert string (as used e.g. in sim_telarray) to list.
Allow coma or space separated strings.
Parameters
----------
data_string: object
String to be converted
Returns
-------
list, str
Converted data from string (if required).
Return data_string if conversion fails.
"""
try:
if is_float:
return [float(v) for v in data_string.split()]
return [int(v) for v in data_string.split()]
except ValueError:
pass
if "," in data_string:
result = data_string.split(",")
return [item.strip() for item in result]
if " " in data_string:
return data_string.split()
return data_string
def _load_yaml_using_astropy(file):
"""
Load a yaml file using astropy's yaml loader.
Parameters
----------
file: file
File to be loaded.
Returns
-------
dict
Dictionary containing the file content.
"""
# pylint: disable=import-outside-toplevel
import astropy.io.misc.yaml as astropy_yaml
file.seek(0)
return astropy_yaml.load(file)
def read_file_encoded_in_utf_or_latin(file_name):
"""
Read a file encoded in UTF-8 or Latin-1.
Parameters
----------
file_name: str
Name of the file to be read.
Returns
-------
list
List of lines read from the file.
Raises
------
UnicodeDecodeError
If the file cannot be decoded using UTF-8 or Latin-1.
"""
try:
with open(file_name, encoding="utf-8") as file:
lines = file.readlines()
except UnicodeDecodeError:
logging.debug("Unable to decode file using UTF-8. Trying Latin-1.")
try:
with open(file_name, encoding="latin-1") as file:
lines = file.readlines()
except UnicodeDecodeError as exc:
raise UnicodeDecodeError("Unable to decode file using UTF-8 or Latin-1.") from exc
return lines