from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from ResSimpy.Nexus.DataModels.Network.NexusConstraint import NexusConstraint
from ResSimpy.Enums.UnitsEnum import UnitSystem
from ResSimpy.Nexus.nexus_file_operations import get_next_value, correct_datatypes
from ResSimpy.Utils.invert_nexus_map import nexus_keyword_to_attribute_name
import fnmatch
if TYPE_CHECKING:
from ResSimpy.File import File
[docs]def load_inline_constraints(file_as_list: list[str], constraint: type[NexusConstraint], current_date: Optional[str],
unit_system: UnitSystem, property_map: dict[str, tuple[str, type]],
existing_constraints: dict[str, list[NexusConstraint]], nexus_file: File,
start_line_index: int, network_names: Optional[list[str]] = None) -> None:
"""Loads table of constraints with the wellname/node first and the constraints following inline
uses previous set of constraints as still applied to the well.
Args:
----
file_as_list (list[str]): file represented as a list of strings
constraint (NexusConstraint): object to store the attributes extracted from each row.
current_date (str): the current date in the table
unit_system (UnitSystem): Unit system enum
property_map (dict[str, tuple[str, type]]): Mapping of nexus keywords to attributes
existing_constraints (dict[str, NexusConstraint]): all existing constraints from previous lines of the \
surface file
network_names (Optional[list[str]]): list of names for all nodes, wells and connections in a nexus network.
Used in deriving constraints from wildcards. Defaults to None
Returns:
-------
dict[UUID, int]: dictionary of object locations derived from inline table.
"""
for index, line in enumerate(file_as_list):
properties_dict: dict[str, str | float | UnitSystem | None] = {'date': current_date, 'unit_system': unit_system}
# first value in the line has to be the node/wellname
name = get_next_value(0, [line])
nones_overwrite = False
constraint_names_to_add: list[str] = []
if name is None:
continue
properties_dict['name'] = name
if '*' in name:
if network_names is None:
raise ValueError('No existing nodes found to add wildcards to')
else:
# filter names that match the pattern
constraint_names_to_add = fnmatch.filter(network_names, name)
else:
constraint_names_to_add.append(name)
trimmed_line = line.replace(name, "", 1)
next_value = get_next_value(0, [trimmed_line])
# loop through the line for each set of constraints
while next_value is not None:
token_value = next_value.upper()
if token_value in ['CLEAR', 'CLEARP', 'CLEARQ', 'CLEARLIMIT', 'CLEARALQ']:
properties_dict[nexus_keyword_to_attribute_name(constraint.get_keyword_mapping(), token_value)] = True
nones_overwrite = True
# break out of the while loop as the next value will not be there
break
elif token_value == 'ACTIVATE' or token_value == 'DEACTIVATE':
properties_dict.update({'active_node': token_value == 'ACTIVATE'})
trimmed_line = trimmed_line.replace(next_value, "", 1)
next_value = get_next_value(0, [trimmed_line])
if next_value is None:
break
token_value = next_value.upper()
trimmed_line = trimmed_line.replace(next_value, "", 1)
# extract the attribute name for the given nexus constraint token
attribute = property_map[token_value][0]
next_value = get_next_value(0, [trimmed_line])
if next_value is None:
raise ValueError(f'No value found after {token_value} in {line}')
elif next_value == 'MULT':
try:
attribute = property_map[token_value + '_MULT'][0]
except AttributeError:
raise AttributeError(f'Unexpected MULT keyword following {token_value}')
properties_dict[attribute] = True
else:
properties_dict[attribute] = correct_datatypes(next_value, float)
trimmed_line = trimmed_line.replace(next_value, "", 1)
next_value = get_next_value(0, [trimmed_line])
# first check if there are any existing constraints created for the well this timestep
for name_of_node in constraint_names_to_add:
properties_dict['name'] = name_of_node
well_constraints = existing_constraints.get(name_of_node, None)
if well_constraints is not None:
latest_constraint = well_constraints[-1]
if latest_constraint.date == current_date:
latest_constraint.update(properties_dict, nones_overwrite)
nexus_file.add_object_locations(latest_constraint.id, [index + start_line_index])
else:
# otherwise take a copy of the previous constraint and add the additional properties
new_constraint = constraint(properties_dict)
well_constraints.append(new_constraint)
nexus_file.add_object_locations(new_constraint.id, [index + start_line_index])
else:
nexus_constraint = constraint(properties_dict)
existing_constraints[name_of_node] = [nexus_constraint]
nexus_file.add_object_locations(nexus_constraint.id, [index + start_line_index])
def __clear_constraints(token_value, constraint) -> dict[str, None]:
"""Replicates behaviour of the clear keyword in nexus constraints by creating a dictionary filled with
Nones for the relevant parameters.
"""
match token_value:
case 'CLEAR':
constraint_clearing_dict = constraint.get_rate_constraints_map()
constraint_clearing_dict.update(constraint.get_pressure_constraints_map())
constraint_clearing_dict.update(constraint.get_limit_constraints_map())
case 'CLEARQ':
constraint_clearing_dict = constraint.get_rate_constraints_map()
case 'CLEARLIMIT':
constraint_clearing_dict = constraint.get_limit_constraints_map()
case 'CLEARP':
constraint_clearing_dict = constraint.get_pressure_constraints_map()
case 'CLEARALQ':
constraint_clearing_dict = constraint.get_alq_constraints_map()
case _:
constraint_clearing_dict = {}
return {x[0]: None for x in constraint_clearing_dict.values()}