from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Union
import pandas as pd
from ResSimpy.Nexus.DataModels.NexusFile import NexusFile
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_BLACKOIL_PRIMARY_KEYWORDS, PVT_TYPE_KEYWORDS, PVT_KEYWORDS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_EOS_METHODS, PVT_EOSOPTIONS_PRIMARY_WORDS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_EOSOPTIONS_PRIMARY_KEYS_INT, PVT_TABLE_KEYWORDS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_TABLES_WITH_ENDWORDS, PVT_TABLES_WITHOUT_ENDWORDS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_EOSOPTIONS_PRIMARY_KEYS_FLOAT, PVT_EOSOPTIONS_TRANS_KEYS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_EOSOPTIONS_TRANS_TEST_KEYS, PVT_EOSOPTIONS_PHASEID_KEYS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_EOSOPTIONS_TERTIARY_KEYS, PVT_ALL_TABLE_KEYWORDS
from ResSimpy.Nexus.NexusKeywords.pvt_keywords import PVT_UNSAT_TABLE_INDICES
from ResSimpy.Enums.UnitsEnum import UnitSystem, SUnits, TemperatureUnits
from ResSimpy.DynamicProperty import DynamicProperty
from ResSimpy.Utils.factory_methods import get_empty_dict_union, get_empty_list_str, get_empty_eosopt_dict_union
import ResSimpy.Nexus.nexus_file_operations as nfo
[docs]@dataclass(kw_only=True, repr=False) # Doesn't need to write an _init_, _eq_ methods, etc.
class NexusPVTMethod(DynamicProperty):
"""Class to hold Nexus PVT properties.
Attributes:
file (NexusFile): Nexus PVT file object
input_number (int): PVT method number in Nexus fcs file
pvt_type (Optional[str]): Type of PVT method, e.g., BLACKOIL, GASWATER or EOS. Defaults to None
eos_nhc (Optional[int]): Number of hydrocarbon components. Defaults to None
eos_temp (Optional[float]): Default temperature for EOS method. Defaults to None
eos_components (Optional[list[str]]): Specifies component names
eos_options (dict[str, Union[str, int, float, pd.DataFrame, list[str], dict[str, float],
tuple[str, dict[str, float]], dict[str, pd.DataFrame]]]): Dictionary containing various EOS options
as specified in the PVT file. Defaults to empty dictionary.
properties (dict[str, Union[str, int, float, Enum, list[str], pd.DataFrame,
dict[str, Union[float, pd.DataFrame]]]]):
Dictionary holding all properties for a specific PVT method. Defaults to empty dictionary.
"""
# General parameters
file: NexusFile
pvt_type: Optional[str] = None
eos_nhc: Optional[int] = None # Number of hydrocarbon components
eos_temp: Optional[float] = None # Default temperature for EOS method
eos_components: Optional[list[str]] = field(default_factory=get_empty_list_str)
eos_options: dict[str, Union[
str, int, float, pd.DataFrame, list[str], dict[str, float], tuple[str, dict[str, float]], dict[
str, pd.DataFrame]]] \
= field(default_factory=get_empty_eosopt_dict_union)
properties: dict[str, Union[str, int, float, Enum, list[str],
pd.DataFrame, dict[str, Union[float, pd.DataFrame]]]] \
= field(default_factory=get_empty_dict_union)
[docs] def __init__(self, file: NexusFile, input_number: int, pvt_type: Optional[str] = None,
eos_nhc: Optional[int] = None, eos_temp: Optional[float] = None,
eos_components: Optional[list[str]] = None,
eos_options: Optional[dict[str, Union[str, int, float, pd.DataFrame, list[str], dict[str, float],
tuple[str, dict[str, float]], dict[str, pd.DataFrame]]]] = None,
properties: Optional[dict[str, Union[str, int, float, Enum, list[str], pd.DataFrame,
dict[str, Union[float, pd.DataFrame]]]]] = None) -> None:
if pvt_type is not None:
self.pvt_type = pvt_type
if eos_nhc is not None:
self.eos_nhc = eos_nhc
if eos_temp is not None:
self.eos_temp = eos_temp
if eos_components is not None:
self.eos_components = eos_components
else:
self.eos_components = []
if eos_options is not None:
self.eos_options = eos_options
else:
self.eos_options = {}
if properties is not None:
self.properties = properties
else:
self.properties = {}
super().__init__(input_number=input_number, file=file)
[docs] def to_string(self) -> str:
"""Create string with PVT data in Nexus file format."""
printable_str = ''
pvt_dict = self.properties
# Print description if present
if 'DESC' in pvt_dict.keys() and isinstance(pvt_dict['DESC'], list):
for desc_line in pvt_dict['DESC']:
printable_str += 'DESC ' + desc_line + '\n'
# Print PVT type and associated properties
printable_str += f'{self.pvt_type}'
if self.eos_nhc is not None:
printable_str += f' NHC {self.eos_nhc}'
for pvt_key in PVT_BLACKOIL_PRIMARY_KEYWORDS:
if pvt_key in pvt_dict.keys():
printable_str += f' {pvt_key} {pvt_dict[pvt_key]}'
printable_str += '\n'
if 'DRYGAS_MFP' in pvt_dict.keys():
printable_str += 'DRYGAS_MFP\n'
# Print EOS-specific required parameters, if present
if self.eos_components is not None:
if len(self.eos_components) > 0:
printable_str += f"COMPONENTS {' '.join(self.eos_components)}\n"
if self.eos_temp is not None:
printable_str += f'TEMP {self.eos_temp}\n'
for key, value in pvt_dict.items():
if isinstance(value, pd.DataFrame):
printable_str += f'{key}\n'
if key == 'IGS_CP':
printable_str += value.to_string(na_rep='', index=False, header=False) + '\n'
else:
printable_str += value.to_string(na_rep='', index=False) + '\n'
if key in PVT_TABLES_WITH_ENDWORDS:
printable_str += 'END'+key+'\n'
printable_str += '\n'
elif isinstance(value, dict):
for subkey in value.keys():
printable_str += f"{key.replace('_',' ')} {subkey}\n"
df = value[subkey]
if isinstance(df, pd.DataFrame):
printable_str += df.to_string(na_rep='', index=False) + '\n'
printable_str += '\n'
elif isinstance(value, Enum):
if isinstance(value, UnitSystem) or isinstance(value, TemperatureUnits):
printable_str += f'{value.value}\n'
elif isinstance(value, SUnits):
printable_str += f'SUNITS {value.value}\n'
elif key not in [*PVT_BLACKOIL_PRIMARY_KEYWORDS, 'NHC', 'DESC', 'DRYGAS_MFP']:
if value == '':
printable_str += f'{key}\n'
else:
printable_str += f'{key} {value}\n'
if len(self.eos_options.keys()) > 0:
pvt_dict = self.eos_options
printable_str += 'EOSOPTIONS'
if 'EOS_METHOD' in pvt_dict.keys():
printable_str += f" {pvt_dict['EOS_METHOD']}"
printable_str += '\n'
for key, value in pvt_dict.items():
if key == 'EOS_OPT_PRIMARY_LIST' and isinstance(value, list):
for token in value:
printable_str += f' {token}\n'
elif isinstance(value, tuple):
printable_str += f' {key} {value[0]}'
val_dict: dict[str, float] = value[1]
for val_key, val_val in val_dict.items():
printable_str += f' {val_key} {val_val}'
printable_str += '\n'
elif isinstance(value, dict):
printable_str += f' {key}'
for subkey, subvalue in value.items():
printable_str += f' {subkey} {subvalue}'
printable_str += '\n'
elif key not in ['EOS_METHOD']:
printable_str += f' {key} {value}\n'
return printable_str
def __populate_eos_opts_to_tertiary_keys(self, primary_key: str, primary_key_default_val: str, single_line: str,
line_list: list[str], list_of_secondary_keys: list[str]):
"""Utility function to populate complex EOS options structures, from primary to tertiary keyword level.
Applies to TRANSITION, TRANS_TEST and PHASEID Nexus EOS options.
Args:
primary_key (str): primary keyword, e.g., TRANSITION or PHASEID
primary_key_default_val (str): default secondary keyword, or primary key value, e.g., TEST
single_line (str): single line as read from input PVT file
line_list (list[str]): list of strings that comprise input PVT file
list_of_secondary_keys (list[str]): list of secondary keywords associated with the given primary keyword
"""
if nfo.check_token(primary_key, single_line):
self.eos_options[primary_key] = primary_key_default_val # Set default value
if nfo.get_expected_token_value(primary_key, single_line, line_list) in list_of_secondary_keys:
self.eos_options[primary_key] = nfo.get_expected_token_value(primary_key, single_line, line_list)
if [i for i in single_line.split() if i in list_of_secondary_keys]:
for secondary_key in list_of_secondary_keys:
if nfo.check_token(secondary_key, single_line):
self.eos_options[primary_key] = secondary_key
for tertiary_key in PVT_EOSOPTIONS_TERTIARY_KEYS:
if nfo.check_token(tertiary_key, single_line):
if isinstance(self.eos_options[primary_key], str): # Convert to tuple
self.eos_options[primary_key] = (secondary_key, {})
secondary_eos_option = self.eos_options[primary_key]
if not isinstance(secondary_eos_option, tuple) or \
not isinstance(secondary_eos_option[1], dict):
raise ValueError(f"EOS secondary key invalid: {secondary_key}")
secondary_eos_option[1][tertiary_key] = float(
nfo.get_expected_token_value(tertiary_key, single_line, line_list))
def __find_pvt_table_starting_index(self, table_key: str, single_line: str, line_list: list[str],
table_indices: dict[str, list[int]],
table_indices_dict: dict[str, dict[str, list[int]]],
table_flag: dict[str, bool], l_index: int,
unsat_obj: dict[str, list[str]] | None = None) -> Optional[int]:
"""Utility function to find the starting line index of a specified PVT table.
Args:
table_key (str): specified PVT table name or undersaturated index, such as, PSAT or RSSAT or PRES
single_line (str): single line as read from input PVT file
line_list (list[str]): list of strings that comprise input PVT file
table_indices ([dict[str, list[int]]): dictionary to store the
starting and ending line index of tables
table_indices_dict (dict[str, dict[str, list[int]]]): dictionary to store the
starting and ending line index of tables, for undersaturated tables
table_flag (bool): flag to tell if a table is currently being read (True) or not (False)
l_index (int): current line index
unsat_obj (dict[str, list[str]]): track saturation pressures from which undersaturated branches emanate
Raises:
ValueError: If a property table key does not have a numerical value
Returns:
int: Updated line index
"""
if unsat_obj is None:
unsat_obj = {}
if table_key not in PVT_UNSAT_TABLE_INDICES: # All tables except undersaturated tables
if nfo.check_token(table_key, single_line):
table_indices[table_key] = [l_index + 1, len(line_list)]
table_flag[table_key] = True
return l_index + 1
else: # Handle undersaturated tables
if table_key == 'PRES':
table_name = 'UNSATGAS'
full_table_name = table_name + '_PRES'
else:
table_name = 'UNSATOIL'
full_table_name = table_name + '_' + table_key
if nfo.check_token(table_name, single_line) and nfo.check_token(table_key, single_line):
if nfo.get_token_value(table_key, single_line, line_list) is None:
raise ValueError(f'Property {table_key} does not have a numerical value.')
unsat_obj[table_key].append(nfo.get_expected_token_value(table_key, single_line, line_list))
if full_table_name in table_indices_dict.keys():
table_indices_dict[full_table_name][unsat_obj[table_key][-1]] = [l_index + 1, len(line_list)]
else:
table_indices_dict[full_table_name] = {unsat_obj[table_key][-1]: [l_index + 1, len(line_list)]}
table_flag[table_name] = True
return l_index + 1
return None
def __find_pvt_table_ending_index(self, table_key: str, single_line: str,
table_indices: dict[str, list[int]],
table_indices_dict: dict[str, dict[str, list[int]]],
l_index: int, table_flag: dict[str, bool],
table_has_endkeyword: bool, reading_flag: bool,
unsat_obj: dict[str, list[str]] = {}) -> bool:
"""Utility function to find the ending line index of a specified PVT table.
Args:
table_key (str): specified PVT table name or undersaturated index, such as, PSAT or RSSAT or PRES
single_line (str): single line as read from input PVT file
table_indices ([dict[str, list[int]]): dictionary to store the
starting and ending line index of tables
table_indices_dict (dict[str, dict[str, list[int]]]): dictionary to store the
starting and ending line index of tables, for undersaturated tables
l_index (int): current line index
table_flag (dict[str, bool]): flag to tell if a table is currently being read (True) or not (False)
table_has_endkeyword (bool): True if table name, e.g., PROPS, has end keyword, i.e., ENDPROPS, else False
reading_flag (bool): True if any table is being read, otherwise False
unsat_obj (dict[str, list[str]]): track saturation pressures from which undersaturated branches emanate
Returns:
bool: True if still reading table, but if identified the ending line index, return False
"""
end_flag_found = False
if table_key not in PVT_UNSAT_TABLE_INDICES: # All tables except undersaturated tables
table_name = table_key
full_table_name = table_key
else: # Handle undersaturated tables
if table_key == 'PRES':
table_name = 'UNSATGAS'
full_table_name = table_name + '_PRES'
else:
table_name = 'UNSATOIL'
full_table_name = table_name + '_' + table_key
# if not table_has_endkeyword and [i for i in single_line.split() if i in PVT_KEYWORDS]:
if not table_has_endkeyword:
for keyword in PVT_KEYWORDS:
if nfo.check_token(keyword, single_line):
end_flag_found = True
if table_has_endkeyword and nfo.check_token('END' + table_name, single_line):
end_flag_found = True
if (full_table_name in table_indices.keys() or full_table_name in table_indices_dict.keys()) and \
end_flag_found and table_flag[table_name]:
if table_key not in PVT_UNSAT_TABLE_INDICES: # All tables except undersaturated tables
table_indices[table_name][1] = l_index
else: # Handle undersaturated tables
table_indices_dict[full_table_name][unsat_obj[table_key][-1]][1] = l_index
table_flag[table_name] = False
reading_flag = False
return reading_flag
[docs] def read_properties(self) -> None:
"""Read Nexus PVT file contents and populate the NexusPVTMethod object."""
file_as_list = self.file.get_flat_list_str_file
# Check for common input data
nfo.check_for_and_populate_common_input_data(file_as_list, self.properties)
# Initialize flags and containers used to record properties as we iterate through pvt file contents
# Dictionary to record start and ending indices for tables
pvt_table_indices: dict[str, list[int]] = {}
pvt_table_indices_dict: dict[str, dict[str, list[int]]] = {}
# Flag to tell when to start reading a table
start_reading_table: bool = False
trans_flag = False
# Dictionary of flags indicating which tables are being read
table_being_read: dict[str, bool] = {}
for table_name in PVT_ALL_TABLE_KEYWORDS:
table_being_read[table_name] = False
# Dictionary of lists to track saturation pressures from which undersaturated branches emanate
pvt_unsat_keys: dict[str, list[str]] = {}
for indx in PVT_UNSAT_TABLE_INDICES:
pvt_unsat_keys[indx] = []
line_indx = 0
for line in file_as_list:
# Determine PVT type, i.e., BLACKOIL, WATEROIL, EOS, etc.
for pvt_type in PVT_TYPE_KEYWORDS:
if nfo.check_token(pvt_type, line):
self.pvt_type = pvt_type
# Extract blackoil fluid density parameters
for fluid_param in PVT_BLACKOIL_PRIMARY_KEYWORDS:
if nfo.check_token(fluid_param, line):
self.properties[fluid_param] = float(nfo.get_expected_token_value(
fluid_param, line, file_as_list, custom_message=f"Property {fluid_param} does \
not have a numerical value."))
if nfo.check_token('DRYGAS_MFP', line):
self.properties['DRYGAS_MFP'] = True
# For EOS or compositional models, get required parameters
if nfo.check_token('NHC', line): # Get number of components
self.eos_nhc = int(nfo.get_expected_token_value('NHC', line,
file_as_list,
custom_message="Property NHC does not \
have a numerical value."))
if nfo.check_token('COMPONENTS', line): # Get NHC components
elems = line.split()
components_index = elems.index('COMPONENTS')
if self.eos_nhc and self.eos_nhc > 0:
self.eos_components = elems[components_index + 1:components_index + 1 + int(self.eos_nhc)]
if nfo.check_token('TEMP', line): # Get default EOS temperature
self.eos_temp = float(nfo.get_expected_token_value(
'TEMP', line, file_as_list, custom_message="Property TEMP does not have a numerical value."))
# Check for EOS options
if nfo.check_token('EOSOPTIONS', line):
if nfo.get_expected_token_value('EOSOPTIONS', line, file_as_list) in PVT_EOS_METHODS:
self.eos_options['EOS_METHOD'] = nfo.get_expected_token_value('EOSOPTIONS', line, file_as_list)
else:
self.eos_options['EOS_METHOD'] = 'PR'
# Find EOS single-word options, like CAPILLARYFLASH and add to list
if [i for i in line.split() if i in PVT_EOSOPTIONS_PRIMARY_WORDS]:
if 'EOS_OPT_PRIMARY_LIST' not in self.eos_options.keys():
self.eos_options['EOS_OPT_PRIMARY_LIST'] = []
if not isinstance(self.eos_options['EOS_OPT_PRIMARY_LIST'], list):
raise ValueError(f"EOS_OPT_PRIMARY_LIST should be a list, instead \
got {self.eos_options['EOS_OPT_PRIMARY_LIST']}")
self.eos_options['EOS_OPT_PRIMARY_LIST'].extend([i for i in line.split() if i
in PVT_EOSOPTIONS_PRIMARY_WORDS])
# Find EOS key-value pairs, like LI_FACT 0.9 or FUGERR 5
if [i for i in line.split() if i in PVT_EOSOPTIONS_PRIMARY_KEYS_FLOAT]:
for key in PVT_EOSOPTIONS_PRIMARY_KEYS_FLOAT:
if nfo.check_token(key, line):
self.eos_options[key] = float(nfo.get_expected_token_value(key, line, file_as_list))
if [i for i in line.split() if i in PVT_EOSOPTIONS_PRIMARY_KEYS_INT]:
for key in PVT_EOSOPTIONS_PRIMARY_KEYS_INT:
if nfo.check_token(key, line):
self.eos_options[key] = int(nfo.get_expected_token_value(key, line, file_as_list))
# Read TRANSITION, TRANS_TEST and PHASEID eos options, if present
primary_keys2populate = ['TRANSITION', 'TRANS_TEST', 'PHASEID']
primary_keys2populate_defaults = ['TEST', 'INCRP', '']
secondary_keys = [PVT_EOSOPTIONS_TRANS_KEYS, PVT_EOSOPTIONS_TRANS_TEST_KEYS, PVT_EOSOPTIONS_PHASEID_KEYS]
if [i for i in line.split() if i in primary_keys2populate]:
trans_flag = True
if trans_flag:
for index in range(len(primary_keys2populate)):
pkey = primary_keys2populate[index]
p2key = primary_keys2populate_defaults[index]
sec_key = secondary_keys[index]
self.__populate_eos_opts_to_tertiary_keys(pkey, p2key, line, file_as_list, sec_key)
# Read TRANS_OPTIMIZATION eos options, if present
if nfo.check_token('TRANS_OPTIMIZATION', line):
new_dict: dict[str, float] = {}
for tert_key in PVT_EOSOPTIONS_TERTIARY_KEYS:
if nfo.check_token(tert_key, line):
potential_value = float(nfo.get_expected_token_value(tert_key, line, file_as_list))
if isinstance(potential_value, float):
new_dict[tert_key] = potential_value
self.eos_options['TRANS_OPTIMIZATION'] = new_dict
# Identify beginning and ending line indices for different kinds tables in PVT file
if start_reading_table: # Figure out ending line indices for tables
for table in PVT_TABLES_WITHOUT_ENDWORDS:
start_reading_table = self.__find_pvt_table_ending_index(table, line, pvt_table_indices,
pvt_table_indices_dict,
line_indx, table_being_read, False,
start_reading_table)
for table in PVT_TABLES_WITH_ENDWORDS:
start_reading_table = self.__find_pvt_table_ending_index(table, line, pvt_table_indices,
pvt_table_indices_dict,
line_indx, table_being_read, True,
start_reading_table)
for table in PVT_UNSAT_TABLE_INDICES:
start_reading_table = self.__find_pvt_table_ending_index(table, line, pvt_table_indices,
pvt_table_indices_dict,
line_indx, table_being_read, False,
start_reading_table, pvt_unsat_keys)
# Figure out beginning line indices for tables
table_found = False
for table in PVT_TABLE_KEYWORDS + PVT_UNSAT_TABLE_INDICES:
if table in PVT_UNSAT_TABLE_INDICES: # Work on undersaturated tables
new_line_indx = self.__find_pvt_table_starting_index(table, line, file_as_list, pvt_table_indices,
pvt_table_indices_dict, table_being_read,
line_indx, pvt_unsat_keys)
else: # All other tables
new_line_indx = self.__find_pvt_table_starting_index(table, line, file_as_list, pvt_table_indices,
pvt_table_indices_dict, table_being_read,
line_indx)
if new_line_indx is not None:
line_indx = new_line_indx
table_found = True
break
if table_found:
continue
# Check if this line represents the header of a PVT table
table_header_row_options = ['PRES', 'DP', 'RV', 'INDEX', 'COMPONENT']
header_row_flag = False
for hr in table_header_row_options:
if nfo.check_token(hr, line):
header_row_flag = True
reading_a_table_flag = False
for table_name in PVT_ALL_TABLE_KEYWORDS:
if table_being_read[table_name]:
reading_a_table_flag = True
if header_row_flag and reading_a_table_flag and not start_reading_table:
start_reading_table = True # If header row, start reading
line_indx += 1
# Read in tables
for key in pvt_table_indices.keys():
if key == 'IGS_CP':
self.properties[key] = nfo.read_table_to_df(file_as_list[
pvt_table_indices[key][0]:pvt_table_indices[key][1]], noheader=True)
else:
self.properties[key] = nfo.read_table_to_df(file_as_list[
pvt_table_indices[key][0]:pvt_table_indices[key][1]])
for key in pvt_table_indices_dict.keys():
self.properties[key] = {}
for subkey in pvt_table_indices_dict[key].keys():
property_key = self.properties[key]
if not isinstance(property_key, dict):
raise ValueError(f"Property is not a dictionary: {str(self.properties[key])}")
property_key[subkey] = nfo.read_table_to_df(file_as_list[
pvt_table_indices_dict[key][subkey][0]:
pvt_table_indices_dict[key][subkey][1]])