Source code for ResSimpy.Nexus.DataModels.Network.NexusConstraints

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, Mapping, Sequence, cast
from uuid import UUID

import pandas as pd

from ResSimpy.Constraint import Constraint
from ResSimpy.Constraints import Constraints
from ResSimpy.Nexus.nexus_collect_tables import collect_all_tables_to_objects
from ResSimpy.Nexus.DataModels.Network.NexusConstraint import NexusConstraint
from ResSimpy.Nexus.DataModels.NexusFile import NexusFile
from ResSimpy.Enums.UnitsEnum import UnitSystem
from ResSimpy.Utils.obj_to_dataframe import obj_to_dataframe
import ResSimpy.Nexus.nexus_file_operations as nfo

if TYPE_CHECKING:
    from ResSimpy.Nexus.NexusNetwork import NexusNetwork
    from ResSimpy.Nexus.NexusSimulator import NexusSimulator


[docs]@dataclass class NexusConstraints(Constraints): __constraints: dict[str, list[NexusConstraint]] = field(default_factory=lambda: {})
[docs] def __init__(self, parent_network: NexusNetwork, model: NexusSimulator) -> None: self.__parent_network: NexusNetwork = parent_network self.__constraints: dict[str, list[NexusConstraint]] = {} self.__model: NexusSimulator = model
[docs] def get_all(self, object_name: Optional[str] = None, date: Optional[str] = None) -> \ Mapping[str, Sequence[NexusConstraint]]: """Get the constraints of the existing model with optional parameters to filter for name and date Args: object_name (Optional[str]): name of the connection, node or wellname to return. Defaults to None. date (Optional[str]): date in model format to filter the dates to in the constraints Returns: dict[str, list[NexusConstraint]] dictionary of all constraints defined within a model, keyed by the \ name of the well/node. """ self.__parent_network.get_load_status() if object_name is None: constraints_to_return = self.__constraints else: constraints_to_return = {k: v for k, v in self.__constraints.items() if k == object_name} if date is None: return constraints_to_return date_filtered_constraints = {} for constraint_name, constraint_list in constraints_to_return.items(): new_constraint_list = [x for x in constraint_list if x.date == date] if len(new_constraint_list) > 0: date_filtered_constraints[constraint_name] = new_constraint_list return date_filtered_constraints
[docs] def get_df(self) -> pd.DataFrame: """Creates a dataframe representing all processed constraint data in a surface file Returns: DataFrame: of the properties of the constraint through time with each row representing \ a change in constraint. """ self.__parent_network.get_load_status() list_constraints = [] for well_constraints in self.__constraints.values(): list_constraints.extend(well_constraints) obj_to_dataframe(list_constraints) return obj_to_dataframe(list_constraints)
def get_overview(self) -> str: raise NotImplementedError('To be implemented') def load(self, surface_file: NexusFile, start_date: str, default_units: UnitSystem) -> None: # CONSTRAINT keyword represents a table with a header and columns. # CONSTRAINTS keyword represents a list of semi structured constraints with a well_name and then constraints new_constraints = collect_all_tables_to_objects(surface_file, { 'CONSTRAINTS': NexusConstraint, 'CONSTRAINT': NexusConstraint, 'QMULT': NexusConstraint }, start_date=start_date, default_units=default_units) cons_list = new_constraints.get('CONSTRAINTS') if isinstance(cons_list, list): raise ValueError( 'Incompatible data format for additional constraints. Expected type "dict" instead got "list"') self._add_to_memory(cons_list) def _add_to_memory(self, additional_constraints: Optional[dict[str, list[NexusConstraint]]]) -> None: """Adds additional constraints to memory within the NexusConstraints object. If user adds constraints list this will not be reflected in the Nexus deck at this time. Args: ---- additional_constraints (list[NexusConstraint]): additional constraints to add as a list """ if additional_constraints is None: return self.__constraints.update(additional_constraints)
[docs] def find_by_properties(self, object_name: str, constraint_dict: dict[str, None | float | str | int]) -> \ NexusConstraint: """Finds a uniquely matching constraint from a given set of properties in a dictionary of attributes. Args: object_name (str): name of the node to which the constraint is applied (node name/well name) constraint_dict (dict[str, float | str | int]): dictionary of attributes to match on. \ Allows for partial matches if it finds a unique constraint. Returns: NexusConstraint of an existing constraint in the model that uniquely matches the provided \ constraint_dict constraint """ self.__parent_network.get_load_status() found_object_from_network = self.__parent_network.find_network_element_with_dict(object_name, constraint_dict, 'constraints') # ensure typing is consistent if isinstance(found_object_from_network, NexusConstraint): return found_object_from_network else: raise TypeError(f'Wrong object type returned expected NexusConstraint, ' f'instead returned {type(found_object_from_network)}')
[docs] def remove(self, constraint_dict: Optional[dict[str, None | float | str | int]] = None, constraint_id: Optional[UUID] = None) -> None: """Remove a constraint based on closest matching constraint, requires node name and date.\ Needs one of at least constraint dict or constraint id. Args: constraint_dict (Optional[dict[str, float | str | int]]): Constraint matching these attributes will be removed. Defaults to None. constraint_id (Optional[UUID]): Constraint matching this id will be removed. Will not be used if constraint dict is provided. Defaults to None. """ self.__parent_network.get_load_status() if constraint_dict is None and constraint_id is None: raise ValueError('no options provided for both constraint_id and constraint_dict') if constraint_dict is not None: name = constraint_dict.get('name', None) if name is None: raise ValueError(f'No well or node name provided instead got {name}') name = str(name) # check for wildcards if '*' in name: raise NotImplementedError(f'Removing constraints with wildcards is currently unsupported, for {name=}') constraint_to_remove = self.find_by_properties(name, constraint_dict) constraint_id = constraint_to_remove.id if constraint_id is None: raise ValueError(f'No constraint found with {constraint_id=}') # find which file and remove from the file as list surface_file = self.__find_which_surface_file_from_id(constraint_id) surface_file.remove_object_from_file_as_list([constraint_id]) # remove from memory for name, list_constraints in self.__constraints.items(): for i, constraint in enumerate(list_constraints): if constraint.id == constraint_id: list_constraints.pop(i)
def __find_which_surface_file_from_id(self, constraint_id: UUID) -> NexusFile: """Finds the surface file with the object id requested.""" # TODO: make this generic with the find_which_wellspec_file_from_completion_id. if self.__model.model_files.surface_files is None: raise ValueError(f'No surface file found in fcs file at {self.__model.model_files.location}') surface_files = [x for x in self.__model.model_files.surface_files.values() if x.object_locations is not None and constraint_id in x.object_locations] if len(surface_files) == 0: raise FileNotFoundError(f'No surface file found with an existing constraint that has: {constraint_id=}') surface_file = surface_files[0] if surface_file.file_content_as_list is None: raise FileNotFoundError(f'No file content found in file: {surface_file.location} ' f'with an existing constraint that has: {constraint_id=}') return surface_file
[docs] def add(self, name: str, constraint_to_add: dict[str, None | float | int | str | UnitSystem] | Constraint, comments: Optional[str] = None) -> None: """Adds a constraint to the network and corresponding surface file. Args: name (str): name of the node to apply constraints to constraint_to_add (dict[str, float | int | str | UnitSystem] | NexusConstraint): properties of \ the constraints or a constraint object """ self.__parent_network.get_load_status() # check for wildcards if '*' in name: raise NotImplementedError('Adding constraints with wildcards is currently unsupported') # add to memory if isinstance(constraint_to_add, dict): new_constraint = NexusConstraint(constraint_to_add) else: new_constraint = cast(NexusConstraint, constraint_to_add) self._add_to_memory({name: [new_constraint]}) # add to the file if self.__model.model_files.surface_files is None: raise FileNotFoundError('No well file found, cannot modify ') file_to_add_to = self.__model.model_files.surface_files[1] file_as_list = file_to_add_to.get_flat_list_str_file if file_as_list is None: raise ValueError(f'No file content found in the surface file specified at {file_to_add_to.location}') constraint_date = new_constraint.date if constraint_date is None: raise ValueError(f'Require date for adding constraint to, instead got {new_constraint.date}') new_constraint_text = [] date_comparison = -1 date_index = -1 new_constraint_index = -1 id_line_locs = [] new_table_needed = False new_date_needed = False new_qmults_table_needed = False # check for need to add qmult table qmult_keywords = ['qmult_oil_rate', 'qmult_gas_rate', 'qmult_water_rate'] # if any of the qmults are defined in the new constraint then add a qmult table add_qmults = any(getattr(new_constraint, x, None) for x in qmult_keywords) for index, line in enumerate(file_as_list): if nfo.check_token('TIME', line): constraint_date_from_file = nfo.get_expected_token_value('TIME', line, [line]) date_comparison = self.__model._sim_controls.compare_dates(constraint_date_from_file, constraint_date) if date_comparison == 0: date_index = index continue elif date_comparison > 0 and date_index >= 0: # if a date that is greater than the additional constraint then we have overshot and need to # add in a new table or time card # this is the case where we don't need to write a new time card new_table_needed = True new_constraint_index = index - 1 elif date_comparison > 0: new_table_needed = True new_date_needed = True new_constraint_index = index else: continue if nfo.check_token('ENDCONSTRAINTS', line) and date_comparison == 0: # find the end of a constraint table and add the new constraint new_constraint_index = index constraint_string = new_constraint.to_table_line() new_constraint_text.append(constraint_string) id_line_locs = [new_constraint_index] elif index == len(file_as_list) - 1 and date_index >= 0 and not nfo.check_token('ENDQMULT', line): # if we're on the final line of the file and we haven't yet set a constraint index new_table_needed = True new_constraint_index = index if add_qmults: new_qmults_table_needed = True if new_date_needed: # if the date card doesn't exist then add it to the file first new_constraint_text.append(f'TIME {constraint_date}\n') if new_table_needed: new_constraint_text.append('CONSTRAINTS\n') new_constraint_text.append(new_constraint.to_table_line()) new_constraint_text.append('ENDCONSTRAINTS\n') id_line_locs = [new_constraint_index + len(new_constraint_text) - 2] if add_qmults and new_qmults_table_needed: new_constraint_text.extend(new_constraint.write_qmult_table()) # add id location for the qmult table as well id_line_locs.append(new_constraint_index + len(new_constraint_text) - 2) add_qmults = False elif add_qmults and nfo.check_token('ENDQMULT', line) and date_comparison == 0: # find the end of the table of qmults that already exist new_qmult_index = index qmult_string = new_constraint.write_qmult_values() new_qmult_object_ids = {new_constraint.id: [new_qmult_index]} file_to_add_to.add_to_file_as_list(additional_content=[qmult_string], index=new_qmult_index, additional_objects=new_qmult_object_ids) add_qmults = False if new_constraint_index >= 0 and not add_qmults: # once we have found where to add constraint then add the constraint to file and update file ids new_constraint_object_ids = { new_constraint.id: id_line_locs } file_to_add_to.add_to_file_as_list(additional_content=new_constraint_text, index=new_constraint_index, additional_objects=new_constraint_object_ids, comments=comments) break
[docs] def modify(self, name: str, current_constraint: dict[str, None | float | int | str] | Constraint, new_constraint_props: dict[str, None | float | int | str | UnitSystem] | Constraint, comments: Optional[str] = None) \ -> None: """Modify an existing constraint. Retains existing constraint values that are not overridden by the new \ constraint properties. Args: name (str): current_constraint (dict[str, None | float | int | str] | Constraint): dictionary or constraint object\ with enough attributes to identify a unique existing constraint in the model. new_constraint_props (dict[str, None | float | int | str] | Constraint): dictionary or constraint to \ update the constraint with. """ def clean_constraint_inputs(constraint: dict[str, None | float | int | str] | Constraint) -> \ dict[str, None | float | int | str]: """Cleans up an input ensuring consistent type is returned.""" if isinstance(constraint, Constraint): cleaned_dict = constraint.to_dict() else: cleaned_dict = constraint return cleaned_dict cleaned_current_constraint = clean_constraint_inputs(current_constraint) cleaned_new_constraint = clean_constraint_inputs(new_constraint_props) existing_constraint_obj = self.find_by_properties(name, cleaned_current_constraint) self.remove(constraint_id=existing_constraint_obj.id) combination_of_constraints = existing_constraint_obj.to_dict() combination_of_constraints.update(cleaned_new_constraint) self.add(name, combination_of_constraints, comments)