Source code for ResSimpy.Nexus.logfile_operations

from __future__ import annotations

import os
from datetime import datetime
from typing import Optional, TYPE_CHECKING
import ResSimpy.Nexus.nexus_file_operations as nfo

if TYPE_CHECKING:
    from ResSimpy.Nexus.NexusSimulator import NexusSimulator


[docs]class Logging:
[docs] def __init__(self, model: NexusSimulator) -> None: """Class for controlling all logging and logfile (*.log) related functionality. Args: model: NexusSimulator instance __job_id (int): Run job ID for executed runs __simulation_start_time (Optional[str]): Execution start time of the simulation when submitted \ to calculation engine __simulation_end_time (Optional[str]): Execution end time of the last time the simulation was run __previous_run_time (Optional[str]): Difference between simulation execution start time and end time. """ self.__model = model self.__job_id: int = -1 self.__simulation_start_time: Optional[str] = None self.__simulation_end_time: Optional[str] = None self.__previous_run_time: Optional[str] = None
[docs] @staticmethod def get_simulation_time(line: str) -> str: """Searches for the simulation time in a line. Args: line (str): line to search for the simulation time Raises: ValueError: Throws error if get_next_value doesn't find any subsequent value in the line Returns: str: value found after TIME card in a line """ value_found = False value = '' line_string = line while value_found is False: next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is None: raise ValueError( f'No next value found in the line supplied, line: {line_string}') if next_value == 'on': line_string = line_string.replace(next_value, '', 1) next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is None: raise ValueError( f'No next value found in the line supplied, line: {line_string}') for c in range(6): line_string = line_string.replace(next_value, '', 1) next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is None: raise ValueError( f'No next value found in the line supplied, line: {line_string}') value += next_value + (' ' if c < 5 else '') value_found = True line_string = line_string.replace(next_value, '', 1) return value
[docs] @staticmethod def convert_server_date(original_date: str) -> datetime: """Convert a datetime string from the server for when the simulation was started to a strptime object. Args: original_date (str): string of a date with format: "Mon Jan 01 00:00:00 CST 2020" Returns: datetime: datetime object derived from the input string """ date_format = '%a %b %d %X %Z %Y' converted_date = original_date # Convert CDT and CST timezones as Python doesn't work with CDT for some reason if 'CDT' in original_date: converted_date = converted_date.replace('CDT', '-0500', 1) date_format = '%a %b %d %X %z %Y' elif 'CST' in original_date: converted_date = converted_date.replace('CST', '-0600', 1) date_format = '%a %b %d %X %z %Y' return datetime.strptime(converted_date, date_format)
[docs] @staticmethod def get_errors_warnings_string(log_file_line_list: list[str]) -> Optional[str]: """Retrieves the number of warnings and errors from the simulation log output, and formats them as a string. Args: log_file_line_list (list[str]): log file formatted as a list of strings with \ a new list entry per line Returns: Optional[str]: string containing the errors and warnings from the simulation log. \ None if error/warning set is too short """ error_warning = "" for line in log_file_line_list: modified_line = line.lower() if "errors" in modified_line and "warnings" in modified_line: error_warning = modified_line error_warning_list = [x for x in error_warning.split(" ") if x != ""] error_warning_list = [nfo.clean_up_string(x) for x in error_warning_list] if len(error_warning_list) < 4: return None errors = error_warning_list[1] warnings = error_warning_list[3] error_warning_str = f"Simulation complete - Errors: {errors} and Warnings: {warnings}" return error_warning_str
[docs] def get_simulation_start_time(self) -> str: """Get the start time of an executed simulation run, if no simulation start time returns '-'.""" self.get_simulation_status() if self.__simulation_start_time is not None: return self.__simulation_start_time else: return '-'
[docs] def get_simulation_end_time(self) -> str: """Get the end time of an executed simulation run if it has completed, if no simulation end time returns '-'.""" self.get_simulation_status() if self.__simulation_end_time is not None: return self.__simulation_end_time else: return '-'
[docs] def get_job_id(self) -> int: """Get the job Id of a simulation run.""" return self.__job_id
def __get_log_path(self, from_startup: bool = False) -> Optional[str]: """Returns the path of the log file for the simulation. Args: from_startup (bool, optional): Searches the same directory as the original_fcs_file_path if True. \ Otherwise searches the destination folder path, failing this then searches the \ original_fcs_file_path if False. Defaults to False. Returns: Optional[str]: The path of the .log file from the simulation if found. If not found returns None. """ folder_path = os.path.dirname( self.__model.original_fcs_file_path) if from_startup else os.path.dirname(self.__model.origin) files = os.listdir(folder_path) original_fcs_file_location = os.path.basename(self.__model.original_fcs_file_path) log_file_name = os.path.splitext(original_fcs_file_location)[ 0] + ".log" if from_startup else self.__model.root_name + ".log" if log_file_name in files: if from_startup: file_location = folder_path else: file_location = self.__model.destination if self.__model.destination is not None else folder_path log_file_path = file_location + "/" + log_file_name return log_file_path else: return None def __update_simulation_start_and_end_times(self, log_file_line_list: list[str]) -> None: """Updates the stored simulation execution start and end times from the log files. Args: log_file_line_list (list[str]): log file information represented with a new entry per line of the file. """ for line in log_file_line_list: if nfo.check_token('start generic pdsh prolog', line): value = self.get_simulation_time(line) self.__simulation_start_time = value if nfo.check_token('end generic pdsh epilog', line): value = self.get_simulation_time(line) self.__simulation_end_time = value
[docs] def get_simulation_status(self, from_startup: bool = False) -> Optional[str]: """Gets the run status of the latest simulation run. Args: from_startup (bool, optional): Searches the same directory as the original_fcs_file_path if True. \ Otherwise searches the destination folder path, failing this then searches the \ original_fcs_file_path if False. Defaults to False. Raises: NotImplementedError: If log file is not found - only supporting simulation status from log files Returns: Optional[str]: the error/warning string if the simulation has finished, otherwise \ returns the running job ID. Empty string if a logfile is not found and from_start up is True """ log_file = self.__get_log_path(from_startup) if log_file is None: if from_startup: return '' raise NotImplementedError( "Only retrieving status from a log file is supported at the moment") else: log_file_line_list = nfo.load_file_as_list(log_file) self.__update_simulation_start_and_end_times(log_file_line_list) job_finished = 'Nexus finished\n' in log_file_line_list if job_finished: self.__previous_run_time = self.__get_start_end_difference() if from_startup \ else self.__previous_run_time return self.get_errors_warnings_string(log_file_line_list=log_file_line_list) else: job_number_line = [ x for x in log_file_line_list if 'Job number:' in x] if len(job_number_line) > 0: self.__job_id = int(job_number_line[0].split(":")[1]) return f"Job Running, ID: {self.__job_id}" return None
def __get_start_end_difference(self) -> Optional[str]: """Returns a string with the previous time taken when the base case was run. Returns: Optional[str]: returns a human readable string of how long the simulation took to run """ if self.__simulation_start_time is None or self.__simulation_end_time is None: return None start_date = self.convert_server_date(self.__simulation_start_time) end_date = self.convert_server_date(self.__simulation_end_time) total_difference = (end_date - start_date) days = int(total_difference.days) hours = int(total_difference.seconds / (60 * 60)) minutes = int((total_difference.seconds / 60) - (hours * 60)) seconds = int(total_difference.seconds - (hours * 60 * 60) - (minutes * 60)) return f"{days} Days, {hours} Hours, {minutes} Minutes {seconds} Seconds"
[docs] def get_base_case_run_time(self) -> str: """Get the time taken for the base case to run. Returns '-' if no run time found.""" if self.__previous_run_time is not None: return self.__previous_run_time else: return '-'
[docs] def get_simulation_progress(self) -> float: """Returns the simulation progress from log files. Raises: NotImplementedError: Only retrieving status from a log file is supported at the moment ValueError: if no times from the runcontrol file are read in Returns: Optional[float]: how long through a simulation run as a proportion of the number of days \ simulated as stated in the runcontrol """ log_file_path = self.__get_log_path() if log_file_path is None: raise NotImplementedError("Only retrieving status from a log file is supported at the moment") log_file = nfo.load_file_as_list(log_file_path) read_in_times = False time_heading_location = None last_time = None for line in log_file: case_name_string = f"Case Name = {self.__model.root_name}" if case_name_string in line: read_in_times = True continue if read_in_times and nfo.check_token('TIME', line): heading_location = 0 line_string = line while len(line_string) > 0: next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is None: break line_string = line_string.replace(next_value, '', 1) if next_value == 'TIME': time_heading_location = heading_location heading_location += 1 if read_in_times and time_heading_location is not None: line_string = line next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is not None and next_value.replace('.', '', 1).isdigit(): if time_heading_location == 0 and (last_time is None or float(next_value) > float(last_time)): last_time = next_value for x in range(time_heading_location): line_string = line_string.replace(next_value, '', 1) next_value = nfo.get_next_value(0, [line_string], line_string) if next_value is None: break # When we reach the time column, read in the time value. if x == (time_heading_location - 1) and \ (last_time is None or float(next_value) > float(last_time)): last_time = next_value if last_time is not None: days_completed = self.__model._sim_controls.convert_date_to_number(last_time) if self.__model._sim_controls.times is None: raise ValueError("No times provided in the instance - please read them in from runcontrol file") total_days = self.__model._sim_controls.convert_date_to_number(self.__model._sim_controls.times[-1]) return round((days_completed / total_days) * 100, 1) return 0