from __future__ import annotations
import os.path
import uuid
from dataclasses import dataclass, field
from typing import Optional, Generator
# Use correct Self type depending upon Python version
import sys
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
from uuid import UUID
import re
import ResSimpy.Nexus.nexus_file_operations as nfo
import warnings
from ResSimpy.Nexus.NexusKeywords.structured_grid_keywords import GRID_OPERATION_KEYWORDS, GRID_ARRAY_FORMAT_KEYWORDS, \
GRID_ARRAY_KEYWORDS
from ResSimpy.Utils.factory_methods import get_empty_list_str, get_empty_list_nexus_file, \
get_empty_dict_uuid_list_int
from ResSimpy.File import File
import pathlib
import os
from datetime import datetime, timezone
[docs]@dataclass(kw_only=True, repr=True)
class NexusFile(File):
"""Class to deal with origin and structure of Nexus files and preserve origin of include files.
Attributes:
location (Optional[str]): Path to the original file being opened. Defaults to None.
include_locations (Optional[list[str]]): list of file paths that the file contains. Defaults to None.
origin (Optional[str]): Where the file was opened from. Defaults to None.
include_objects (Optional[list[NexusFile]]): The include files but generated as a NexusFile instance. \
Defaults to None.
linked_user (Optional[str]): user or owner of the file. Defaults to None
last_modified (Optional[datetime]): last modified date of the file
"""
include_locations: Optional[list[str]] = field(default=None)
origin: Optional[str] = None
include_objects: Optional[list[NexusFile]] = field(default=None, repr=False)
object_locations: Optional[dict[UUID, list[int]]] = field(default=None, repr=False)
line_locations: Optional[list[tuple[int, UUID]]] = field(default=None, repr=False)
linked_user: Optional[str] = field(default=None)
last_modified: Optional[datetime] = field(default=None)
[docs] def __init__(self, location: Optional[str] = None,
include_locations: Optional[list[str]] = None,
origin: Optional[str] = None,
include_objects: Optional[list[NexusFile]] = None,
file_content_as_list: Optional[list[str]] = None,
linked_user: Optional[str] = None,
last_modified: Optional[datetime] = None) -> None:
super().__init__(location=location, file_content_as_list=file_content_as_list)
if origin is not None and location is not None:
self.location = nfo.get_full_file_path(location, origin)
else:
self.location = location
self.input_file_location: Optional[str] = location
self.include_locations: Optional[list[str]] = get_empty_list_str() if include_locations is None else \
include_locations
self.origin: Optional[str] = origin
self.include_objects: Optional[list[NexusFile]] = get_empty_list_nexus_file() \
if include_objects is None else include_objects
if self.object_locations is None:
self.object_locations: dict[UUID, list[int]] = get_empty_dict_uuid_list_int()
if self.line_locations is None:
self.line_locations = []
self.file_id = uuid.uuid4()
self.linked_user = linked_user
self.last_modified = last_modified
[docs] @classmethod
def generate_file_include_structure(cls, file_path: str, origin: Optional[str] = None, recursive: bool = True,
skip_arrays: bool = True, top_level_file: bool = True) -> Self:
"""Generates a nexus file instance for a provided text file with information storing the included files.
Args:
file_path (str): path to a file
origin (Optional[str], optional): Where the file was opened from. Defaults to None.
recursive (bool): Whether the method should recursively drill down multiple layers of include_locations.
skip_arrays (bool): If set True skips the INCLUDE arrays that come after property array and VALUE
top_level_file (bool): If set to True, the code assumes this is a 'top level' file rather than an included
one.
Returns:
NexusFile: a class instance for NexusFile with knowledge of include files
"""
def __get_pathlib_path_details(full_file_path: str):
if full_file_path == "" or full_file_path is None:
return None
pathlib_path = pathlib.Path(full_file_path)
owner = pathlib_path.owner()
group = pathlib_path.group()
if owner is not None and group is not None:
return f"{owner}:{group}"
elif owner is not None:
return owner
return None
def __get_datetime_from_os_stat(full_file_path: str):
if full_file_path == "" or full_file_path is None:
return None
stat_obj = os.stat(full_file_path)
timestamp = stat_obj.st_mtime
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
user = None
last_changed = None
full_file_path = file_path
if origin is not None:
full_file_path = nfo.get_full_file_path(file_path, origin)
user = __get_pathlib_path_details(full_file_path)
last_changed = __get_datetime_from_os_stat(full_file_path)
try:
file_as_list = nfo.load_file_as_list(full_file_path)
except FileNotFoundError:
# handle if a file can't be found
location = file_path
nexus_file_class = cls(location=location,
include_locations=None,
origin=origin,
include_objects=None,
file_content_as_list=None,
linked_user=user,
last_modified=last_changed)
warnings.warn(UserWarning(f'No file found for: {file_path} while loading {origin}'))
return nexus_file_class
# prevent python from mutating the lists that it's iterating over
modified_file_as_list: list[str] = []
# search for the INCLUDE keyword and append to a list:
inc_file_list: list[str] = []
includes_objects: Optional[list[NexusFile]] = []
skip_next_include = False
previous_line: str
for i, line in enumerate(file_as_list):
if len(modified_file_as_list) >= 1:
previous_line = modified_file_as_list[len(modified_file_as_list) - 1].rstrip('\n')
if previous_line.endswith('>'):
modified_file_as_list[len(modified_file_as_list) - 1] = previous_line[:-1] + line
else:
modified_file_as_list.append(line)
else:
modified_file_as_list.append(line)
if line.rstrip('\n').endswith('>'):
continue
if nfo.check_token("INCLUDE", line):
# Include found, check if we should skip loading it in (e.g. if it is a large array file)
ignore_keywords = ['NOLIST']
previous_value = nfo.get_previous_value(file_as_list=file_as_list[0: i + 1], search_before='INCLUDE',
ignore_values=ignore_keywords)
keywords_to_skip_include = GRID_ARRAY_FORMAT_KEYWORDS + GRID_OPERATION_KEYWORDS + ["CORP"]
if previous_value is None:
skip_next_include = False
elif previous_value.upper() in keywords_to_skip_include:
skip_next_include = True
elif nfo.check_token("VALUE", line) and not top_level_file:
# Check if this is an 'embedded' grid array file. If it is, return this file with only the content up
# to this point to help with performance when analysing the files.
previous_value = nfo.get_previous_value(file_as_list=file_as_list[0: i + 1], search_before='VALUE')
next_value = nfo.get_next_value(start_line_index=0, file_as_list=file_as_list[i:],
search_string=line.upper().split('VALUE')[1])
if previous_value is None or next_value is None:
continue
if next_value.upper() != 'INCLUDE' and previous_value.upper() in GRID_ARRAY_KEYWORDS:
nexus_file_class = cls(
location=file_path,
include_locations=inc_file_list,
origin=origin,
include_objects=includes_objects,
file_content_as_list=modified_file_as_list
)
return nexus_file_class
else:
continue
else:
continue
inc_file_path = nfo.get_token_value('INCLUDE', line, file_as_list)
if inc_file_path is None:
continue
inc_full_path = nfo.get_full_file_path(inc_file_path, origin=full_file_path)
# store the included files as files inside the object
inc_file_list.append(inc_full_path)
if not recursive:
continue
elif skip_arrays and skip_next_include:
inc_file = cls(location=inc_file_path,
include_locations=None,
origin=full_file_path,
include_objects=None,
file_content_as_list=None,
linked_user=user,
last_modified=last_changed)
if includes_objects is None:
raise ValueError('include_objects is None - recursion failure.')
skip_next_include = False
else:
inc_file = cls.generate_file_include_structure(inc_file_path, origin=full_file_path, recursive=True,
skip_arrays=skip_arrays, top_level_file=False)
if includes_objects is None:
raise ValueError('include_objects is None - recursion failure.')
includes_objects.append(inc_file)
includes_objects = None if not includes_objects else includes_objects
nexus_file_class = cls(
location=file_path,
include_locations=inc_file_list,
origin=origin,
include_objects=includes_objects,
file_content_as_list=modified_file_as_list,
linked_user=user,
last_modified=last_changed
)
return nexus_file_class
[docs] def export_network_lists(self):
"""Exports lists of connections from and to for use in network graphs.
Raises:
ValueError: If the from and to lists are not the same length
Returns:
tuple[list]: list of to and from file paths where the equivalent indexes relate to a connection
"""
from_list = [self.origin]
to_list = [self.location]
if not [self.origin]:
to_list = []
if self.include_locations is not None:
from_list += [self.location] * len(self.include_locations)
to_list += self.include_locations
if len(from_list) != len(to_list):
raise ValueError(
f"{from_list=} and {to_list=} are not the same length")
return from_list, to_list
[docs] @dataclass
class FileIndex:
index: int
[docs] def iterate_line(self, file_index: Optional[FileIndex] = None, max_depth: Optional[int] = None,
parent: Optional[NexusFile] = None, prefix_line: Optional[str] = None,
keep_include_references=False) -> \
Generator[str, None, None]:
"""Generator object for iterating over a list of strings with nested NexusFile objects in them.
Yields:
str: sequential line from the file.
"""
if file_index is None:
file_index = NexusFile.FileIndex(index=0)
if parent is None:
parent = self
parent.line_locations = []
if parent.line_locations is None:
parent.line_locations = []
if prefix_line is not None and prefix_line != ' ':
file_index.index += 1
yield prefix_line
new_entry = (file_index.index, self.file_id)
if new_entry not in parent.line_locations:
parent.line_locations.append(new_entry)
depth: int = 0
if max_depth is not None:
depth = max_depth
if self.file_content_as_list is None:
warnings.warn(f'No file content found for file: {self.location}')
return
for row in self.file_content_as_list:
if nfo.check_token('INCLUDE', row):
incfile_location = nfo.get_token_value('INCLUDE', row, self.file_content_as_list)
if incfile_location is None:
continue
split_line = re.split(incfile_location, row, maxsplit=1, flags=re.IGNORECASE)
if len(split_line) == 2:
prefix_line, suffix_line = split_line
prefix_line = re.sub('INCLUDE', '', prefix_line, flags=re.IGNORECASE)
prefix_line = prefix_line.rstrip() + ' '
suffix_line = suffix_line.lstrip()
else:
prefix_line = row.replace(incfile_location, '')
suffix_line = None
include_file = None
if self.include_objects is None:
raise ValueError(f'No include objects found in the nexusfile to expand over for file: '
f'{self.location}')
for obj in self.include_objects:
if obj.location == incfile_location:
include_file = obj
break
if self.origin is not None and \
obj.location == nfo.get_full_file_path(incfile_location, self.origin):
include_file = obj
break
if obj.location is not None and \
os.path.basename(obj.location) == os.path.basename(incfile_location):
include_file = obj
break
if (max_depth is None or depth > 0) and include_file is not None:
level_down_max_depth = None if max_depth is None else depth - 1
if keep_include_references:
yield row
yield from include_file.iterate_line(file_index=file_index, max_depth=level_down_max_depth,
parent=parent, prefix_line=prefix_line)
new_entry = (file_index.index, self.file_id)
if new_entry not in parent.line_locations:
parent.line_locations.append(new_entry)
if suffix_line:
file_index.index += 1
# Add in space between include location and the rest of the line
suffix_line = ' ' + suffix_line
yield suffix_line
else:
continue
else:
file_index.index += 1
yield row
@property
def get_flat_list_str_file(self) -> list[str]:
if self.file_content_as_list is None:
raise ValueError(f'No file content found for {self.location}')
flat_list = list(self.iterate_line(file_index=None, keep_include_references=False))
return flat_list
@property
def get_flat_list_str_file_including_includes(self) -> list[str]:
if self.file_content_as_list is None:
raise ValueError(f'No file content found for {self.location}')
flat_list = list(self.iterate_line(file_index=None, keep_include_references=True))
return flat_list
# TODO write an output function using the iterate_line method
[docs] def get_full_network(self, max_depth: Optional[int] = None) -> tuple[list[str | None], list[str | None]]:
"""Recursively constructs two lists of from and to nodes representing the connections between files.
Args:
max_depth (Optional[int], optional): depth of the iteration to construct the network down to. \
Defaults to None.
Returns:
tuple[list[str | None], list[str | None]]: two lists of from and to nodes where corresponding \
indices create an edge within a graph network. e.g. (from_list[i], to_list[i]) \
is a connection between two files.
"""
depth: int = 0
from_list = [self.origin]
to_list = [self.location]
if max_depth is not None:
depth = max_depth
if self.file_content_as_list is None:
return from_list, to_list
for row in self.file_content_as_list:
if isinstance(row, NexusFile):
if (max_depth is None or depth > 0):
level_down_max_depth = None if max_depth is None else depth - 1
temp_from_list, temp_to_list = row.export_network_lists()
from_list.extend(temp_from_list)
to_list.extend(temp_to_list)
temp_from_list, temp_to_list = row.get_full_network(max_depth=level_down_max_depth)
from_list.extend(temp_from_list)
to_list.extend(temp_to_list)
return from_list, to_list
[docs] def add_object_locations(self, obj_uuid: UUID, line_indices: list[int]) -> None:
"""Adds a uuid to the object_locations dictionary. Used for storing the line numbers where objects are stored
within the flattened file_as_list.
Args:
obj_uuid (UUID): unique identifier of the object being created/stored.
line_indices (list[int]): line number in the flattened file_content_as_list
(i.e. from the get_flat_list_str_file method).
"""
if self.object_locations is None:
self.object_locations: dict[UUID, list[int]] = get_empty_dict_uuid_list_int()
existing_line_locations = self.object_locations.get(obj_uuid, None)
if existing_line_locations is not None:
existing_line_locations.extend(line_indices)
existing_line_locations.sort()
else:
self.object_locations[obj_uuid] = line_indices
def __update_object_locations(self, line_number: int, number_additional_lines: int) -> None:
"""Updates the object locations in a nexusfile by the additional lines. Used when files have been modified and
an addition/removal of lines has occurred. Ensures that the object locations are correct to the actual lines
in the file_as_list.
Args:
line_number (int): Line number at which the new lines have been added
number_additional_lines (int): number of new lines added.
"""
if self.object_locations is None:
return
for object_id, list_indices in self.object_locations.items():
for i, obj_index in enumerate(list_indices):
if obj_index >= line_number:
self.object_locations[object_id][i] = obj_index + number_additional_lines
def __remove_object_locations(self, obj_uuid: UUID) -> None:
"""Removes an object location based on the obj_uuid provided. Used when removing objects in the file_as_list.
Args:
obj_uuid (UUID): id of the removed object.
"""
if self.object_locations is None:
raise ValueError(f'No object locations found for file {self.location}')
if self.object_locations.get(obj_uuid, None) is None:
raise ValueError(f'No object with {obj_uuid=} found within the object locations')
self.object_locations.pop(obj_uuid, None)
[docs] def find_which_include_file(self, flattened_index: int) -> tuple[File, int]:
"""Given a line index that relates to a position within the flattened file_as_list from the method
get_flat_file_as_list.
Args:
flattened_index (int): index in the flattened file as list structure
Returns:
tuple[File, int] where the first element is the file that the relevant line is in and the second
element is the relative index in that file.
"""
if self.line_locations is None:
# call get_flat_list_str_file to ensure line locations are updated
self.get_flat_list_str_file
if self.line_locations is None:
raise ValueError("No include line locations found.")
line_locations = [x[0] for x in self.line_locations]
line_locations.sort()
uuid_index: Optional[UUID] = None
index_in_included_file = 0
# Find the Nexusfile containing the line we are looking for
for numlines, obj_id in self.line_locations:
if numlines <= flattened_index:
uuid_index = obj_id
if numlines >= flattened_index:
break
get_next_line_count = False
lines_already_included = 0
previous_lines_value = 0
# Add the previously included lines from the file (if any) to the location after it has been split
for numlines, obj_id in self.line_locations:
if obj_id == uuid_index:
get_next_line_count = True
previous_lines_value = numlines
continue
else:
# If we have gone beyond where the line is, calculate the location in the relevant file
if numlines >= flattened_index:
lines_already_included += flattened_index - previous_lines_value
break
if get_next_line_count:
lines_already_included += numlines - previous_lines_value
get_next_line_count = False
previous_lines_value = numlines
if flattened_index > line_locations[-1]:
lines_already_included += flattened_index - line_locations[-1]
index_in_included_file += lines_already_included
if uuid_index == self.file_id or self.include_objects is None:
return self, index_in_included_file
nexus_file = None
for file in self.include_objects:
if file.file_id == uuid_index:
nexus_file = file
elif file.include_objects is not None:
# CURRENTLY THIS ONLY SUPPORTS 2 LEVELS OF INCLUDES
for lvl_2_include in file.include_objects:
if lvl_2_include.file_id == uuid_index:
nexus_file = lvl_2_include
if nexus_file is None:
raise ValueError(f'No file with {uuid_index=} found within include objects')
return nexus_file, index_in_included_file
[docs] def add_to_file_as_list(self, additional_content: list[str], index: int,
additional_objects: Optional[dict[UUID, list[int]]] = None,
comments: Optional[str] = None) -> None:
"""To add content to the file as list, also updates object numbers and optionally allows user \
to add several additional new objects.
Args:
additional_content (list[str]): Additional lines as a list of strings to be added.
index (int): index to insert the new lines at in the calling flat_file_as_list
additional_objects (Optional[dict[UUID, int]]): defaults to None. Otherwise, a dictionary keyed with the \
UUID of the new objects to add as well as the corresponding index of the object in the original \
calling NexusFile
comments (str | None): defaults to None. Comments to add in-line to the file.
"""
if comments is not None:
additional_content = NexusFile.insert_comments(additional_content, comments)
nexusfile_to_write_to, relative_index = self.find_which_include_file(index)
if nexusfile_to_write_to.file_content_as_list is None:
raise ValueError(f'No file content to write to in file: {nexusfile_to_write_to}')
nexusfile_to_write_to.file_content_as_list = \
nexusfile_to_write_to.file_content_as_list[:relative_index] + \
additional_content + nexusfile_to_write_to.file_content_as_list[relative_index:]
self._file_modified_set(True)
# write straight to file
nexusfile_to_write_to.write_to_file()
# update object locations
self.__update_object_locations(line_number=index, number_additional_lines=len(additional_content))
if additional_objects is None:
return
for object_id, line_index in additional_objects.items():
self.add_object_locations(obj_uuid=object_id, line_indices=line_index)
[docs] def remove_object_from_file_as_list(self, objects_to_remove: list[UUID]) -> None:
"""Removes all associated lines in the file as well as the object locations relating to a list of objects."""
if self.object_locations is None:
raise ValueError('Cannot remove object from object_locations as object_locations is None. '
'Check object locations is being populated properly.')
for obj_to_remove in objects_to_remove:
# find all locations in the code that relate to the object
obj_locs = self.object_locations.get(obj_to_remove, None)
if obj_locs is None:
continue
# sort from highest to lowest to ensure line indices are not affected by removal of lines
sorted_obj_locs = sorted(obj_locs, reverse=True)
for i, index in enumerate(sorted_obj_locs):
if i == 0:
# for the first removal remove the object location
self.remove_from_file_as_list(index, objects_to_remove=[obj_to_remove])
else:
# the remaining iterations remove just the lines
self.remove_from_file_as_list(index)
self._file_modified_set(True)
[docs] def remove_from_file_as_list(self, index: int, objects_to_remove: Optional[list[UUID]] = None,
string_to_remove: Optional[str] = None) -> None:
"""Remove an entry from the file as list. Also updates existing object locations and removes any \
specified objects from the object locations dictionary.
Args:
index (int): index n the calling flat_file_as_list to remove the entry from
objects_to_remove (Optional[list[UUID]]): list of object id's to remove from the object locations. \
Defaults to None
string_to_remove (Optional[str]): if specified will only remove the listed string from the entry \
at the index. Defaults to None, which removes the entire entry.
"""
nexusfile_to_write_to, relative_index = self.find_which_include_file(index)
# remove the line in the file:
if nexusfile_to_write_to.file_content_as_list is None:
raise ValueError(
f'No file content in the file attempting to remove line from {nexusfile_to_write_to.location}')
if string_to_remove is None:
nexusfile_to_write_to.file_content_as_list.pop(relative_index)
self.__update_object_locations(line_number=index, number_additional_lines=-1)
else:
entry_to_replace = nexusfile_to_write_to.file_content_as_list[relative_index]
if isinstance(entry_to_replace, str):
nexusfile_to_write_to.file_content_as_list[relative_index] = \
entry_to_replace.replace(string_to_remove, '', 1)
else:
raise ValueError(
f'Tried to replace at non string value at index: {relative_index} in '
f'file_as_list instead got {entry_to_replace}')
if objects_to_remove is not None:
for object_id in objects_to_remove:
self.__remove_object_locations(object_id)
self._file_modified_set(True)
nexusfile_to_write_to.write_to_file()
[docs] def get_object_locations_for_id(self, id: UUID) -> list[int]:
"""Gets the number of object locations for a specified id."""
if self.object_locations is None or len(self.object_locations[id]) == 0:
raise ValueError(f'No object locations specified, cannot find id: {id} in {self.object_locations}')
return self.object_locations[id]