Skip to content

Core Classes

Core classes for HEC-RAS project management and execution.

Important Notes

Static Class Pattern

All primary classes use static methods - do NOT instantiate:

Python
# Correct
RasCmdr.compute_plan("01")

# Wrong - will fail
cmd = RasCmdr()
cmd.compute_plan("01")

RASMapper Flag Inversion

When using RasPlan.update_run_flags(), note that RASMapper flags have inverted logic:

  • Standard flags: True = -1, False = 0
  • RASMapper flag: True = 0, False = -1

This is a HEC-RAS quirk, not a library bug.

Input Flexibility

Most methods accept multiple input types via @standardize_input:

Python
# All valid for HDF methods:
HdfResultsMesh.get_mesh_max_ws("01")           # Plan number
HdfResultsMesh.get_mesh_max_ws(1)              # Integer
HdfResultsMesh.get_mesh_max_ws(Path("x.hdf")) # Path object

Project Management

init_ras_project

init_ras_project

Python
init_ras_project(ras_project_folder, ras_version=None, ras_object=None, load_results_summary=True, hide_intro=False) -> RasPrj

Initialize a RAS project for use with the ras-commander library.

This is the primary function for setting up a HEC-RAS project. It: 1. Finds the project file (.prj) in the specified folder OR uses the provided .prj file 2. Validates .prj files by checking for "Proj Title=" marker 3. Identifies the appropriate HEC-RAS executable 4. Loads project data (plans, geometries, flows) 5. Creates dataframes containing project components 6. Loads HDF results summaries (if load_results_summary=True)

Parameters:

Name Type Description Default
ras_project_folder str or Path

Path to the RAS project folder OR direct path to a .prj file. If a .prj file is provided: - File is validated to have .prj extension - File content is checked for "Proj Title=" marker - Parent folder is used as the project folder

required
ras_version str

The version of RAS to use (e.g., "7.0") OR a full path to the Ras.exe file (e.g., "D:/Programs/HEC/HEC-RAS/6.6/Ras.exe"). If None, will attempt to detect from plan files.

None
ras_object RasPrj

If None, updates the global 'ras' object. If a RasPrj instance, updates that instance. If any other value, creates and returns a new RasPrj instance.

None
load_results_summary bool, default=True

If True, populate results_df with lightweight HDF results summaries at initialization. This enables quick queries of execution status and basic results via ras.results_df without needing to re-scan HDF files. Set to False for faster initialization when results are not needed.

True
hide_intro bool, default=False

If True, suppress the agent intro banner that is printed after initialization. The banner provides API guidance for AI agents using the library.

False

Returns:

Name Type Description
RasPrj RasPrj

An initialized RasPrj instance.

Raises:

Type Description
FileNotFoundError

If the specified project folder or .prj file doesn't exist.

ValueError

If the provided file is not a .prj file, does not contain "Proj Title=", or if no HEC-RAS project file is found in the folder.

Example

Initialize using project folder (existing behavior)

init_ras_project("/path/to/project", "7.0") print(f"Initialized project: {ras.project_name}")

Initialize using direct .prj file path (new feature)

init_ras_project("/path/to/project/MyModel.prj", "7.0") print(f"Initialized project: {ras.project_name}")

Create a new RasPrj instance with .prj file

my_project = init_ras_project("/path/to/project/MyModel.prj", "7.0", "new") print(f"Created project instance: {my_project.project_name}")

Skip results loading for faster initialization

init_ras_project("/path/to/project", "7.0", load_results_summary=False)

Source code in ras_commander/RasPrj.py
Python
@log_call
def init_ras_project(
    ras_project_folder,
    ras_version=None,
    ras_object=None,
    load_results_summary=True,
    hide_intro=False
) -> 'RasPrj':
    """
    Initialize a RAS project for use with the ras-commander library.

    This is the primary function for setting up a HEC-RAS project. It:
    1. Finds the project file (.prj) in the specified folder OR uses the provided .prj file
    2. Validates .prj files by checking for "Proj Title=" marker
    3. Identifies the appropriate HEC-RAS executable
    4. Loads project data (plans, geometries, flows)
    5. Creates dataframes containing project components
    6. Loads HDF results summaries (if load_results_summary=True)

    Args:
        ras_project_folder (str or Path): Path to the RAS project folder OR direct path to a .prj file.
                                          If a .prj file is provided:
                                          - File is validated to have .prj extension
                                          - File content is checked for "Proj Title=" marker
                                          - Parent folder is used as the project folder
        ras_version (str, optional): The version of RAS to use (e.g., "7.0") OR
                                     a full path to the Ras.exe file (e.g., "D:/Programs/HEC/HEC-RAS/6.6/Ras.exe").
                                     If None, will attempt to detect from plan files.
        ras_object (RasPrj, optional): If None, updates the global 'ras' object.
                                       If a RasPrj instance, updates that instance.
                                       If any other value, creates and returns a new RasPrj instance.
        load_results_summary (bool, default=True): If True, populate results_df with lightweight
                                                    HDF results summaries at initialization. This
                                                    enables quick queries of execution status and
                                                    basic results via ras.results_df without needing
                                                    to re-scan HDF files. Set to False for faster
                                                    initialization when results are not needed.
        hide_intro (bool, default=False): If True, suppress the agent intro banner that is
                                          printed after initialization. The banner provides
                                          API guidance for AI agents using the library.

    Returns:
        RasPrj: An initialized RasPrj instance.

    Raises:
        FileNotFoundError: If the specified project folder or .prj file doesn't exist.
        ValueError: If the provided file is not a .prj file, does not contain "Proj Title=",
                    or if no HEC-RAS project file is found in the folder.

    Example:
        >>> # Initialize using project folder (existing behavior)
        >>> init_ras_project("/path/to/project", "7.0")
        >>> print(f"Initialized project: {ras.project_name}")
        >>>
        >>> # Initialize using direct .prj file path (new feature)
        >>> init_ras_project("/path/to/project/MyModel.prj", "7.0")
        >>> print(f"Initialized project: {ras.project_name}")
        >>>
        >>> # Create a new RasPrj instance with .prj file
        >>> my_project = init_ras_project("/path/to/project/MyModel.prj", "7.0", "new")
        >>> print(f"Created project instance: {my_project.project_name}")
        >>>
        >>> # Skip results loading for faster initialization
        >>> init_ras_project("/path/to/project", "7.0", load_results_summary=False)
    """
    # Convert to Path object for consistent handling
    # Use safe_resolve to preserve drive letters on Windows mapped network drives
    from .RasUtils import RasUtils
    input_path = RasUtils.safe_resolve(Path(ras_project_folder))

    # Detect if input is a file or folder
    if input_path.is_file():
        # User provided a .prj file path
        if input_path.suffix.lower() != '.prj':
            error_msg = f"The provided file is not a HEC-RAS project file (.prj): {input_path}"
            logger.error(error_msg)
            raise ValueError(f"{error_msg}. Please provide either a project folder or a .prj file.")

        # Enhanced validation: Check if file contains "Proj Title=" to verify it's a HEC-RAS project file
        try:
            content, encoding = read_file_with_fallback_encoding(input_path)
            if content is None or "Proj Title=" not in content:
                error_msg = f"The file does not appear to be a valid HEC-RAS project file (missing 'Proj Title='): {input_path}"
                logger.error(error_msg)
                raise ValueError(f"{error_msg}. Please provide a valid HEC-RAS .prj file.")
            logger.debug(f"Validated .prj file contains 'Proj Title=' marker")
        except Exception as e:
            error_msg = f"Error validating .prj file: {e}"
            logger.error(error_msg)
            raise ValueError(f"{error_msg}. Please ensure the file is a valid HEC-RAS project file.")

        # Extract the parent folder to use as project_folder
        project_folder = input_path.parent
        specified_prj_file = input_path  # Store for optimization
        logger.debug(f"User provided .prj file: {input_path}")
        logger.debug(f"Using parent folder as project_folder: {project_folder}")

    elif input_path.is_dir():
        # User provided a folder path (existing behavior)
        project_folder = input_path
        specified_prj_file = None
        logger.debug(f"User provided folder path: {project_folder}")

    else:
        # Path doesn't exist
        if input_path.suffix.lower() == '.prj':
            error_msg = f"The specified .prj file does not exist: {input_path}"
            logger.error(error_msg)
            raise FileNotFoundError(f"{error_msg}. Please check the path and try again.")
        else:
            error_msg = f"The specified RAS project folder does not exist: {input_path}"
            logger.error(error_msg)
            raise FileNotFoundError(f"{error_msg}. Please check the path and try again.")

    # Determine which RasPrj instance to use
    if ras_object is None:
        # Use the global 'ras' object
        logger.debug("Initializing global 'ras' object via init_ras_project function.")
        ras_object = ras
    elif not isinstance(ras_object, RasPrj):
        # Create a new RasPrj instance
        logger.debug("Creating a new RasPrj instance.")
        ras_object = RasPrj()

    ras_exe_path = None

    # Use version specified by user if provided
    if ras_version is not None:
        ras_exe_path = get_ras_exe(ras_version)
        if ras_exe_path == "Ras.exe" and ras_version != "Ras.exe":
            logger.warning(
                f"HEC-RAS Version {ras_version} was not found. Running HEC-RAS will fail. "
                f"See: https://ras-commander.readthedocs.io/getting-started/installation/"
            )
    else:
        # No version specified, try to detect from plan files
        detected_version = None
        logger.debug("No HEC-RAS version specified. Detecting from plan files.")

        # Look for .pXX files in project folder
        logger.debug(f"Searching for plan files in {project_folder}")
        # Search for any file with .p01 through .p99 extension, regardless of base name
        plan_files = list(project_folder.glob("*.p[0-9][0-9]"))

        if not plan_files:
            logger.debug(f"No plan files found in {project_folder}")

        for plan_file in plan_files:
            logger.debug(f"Found plan file: {plan_file.name}")
            content, encoding = read_file_with_fallback_encoding(plan_file)

            if not content:
                logger.debug(f"Could not read content from {plan_file.name}")
                continue

            logger.debug(f"Successfully read plan file with {encoding} encoding")

            # Look for Program Version in plan file
            for line in content.splitlines():
                if line.startswith("Program Version="):
                    version = line.split("=")[1].strip()
                    logger.debug(f"Found Program Version={version} in {plan_file.name}")

                    # Replace 00 in version string if present
                    if "00" in version:
                        version = version.replace("00", "0")

                    # Try to get RAS executable for this version
                    test_exe_path = get_ras_exe(version)
                    logger.debug(f"Checking RAS executable path: {test_exe_path}")

                    if test_exe_path != "Ras.exe":
                        detected_version = version
                        ras_exe_path = test_exe_path
                        logger.info(f"Detected HEC-RAS version {version} from {plan_file.name}")
                        break
                    else:
                        logger.debug(f"Version {version} not found in default installation path")

            if detected_version:
                break

        if not detected_version:
            logger.error("No valid HEC-RAS version found in any plan files.")
            ras_exe_path = "Ras.exe"
            logger.warning(
                "No valid HEC-RAS version was detected. Running HEC-RAS will fail. "
                "See: https://ras-commander.readthedocs.io/getting-started/installation/"
            )

    # Initialize or re-initialize with the determined executable path
    # Pass specified_prj_file to avoid re-searching when user provided .prj file directly
    if specified_prj_file is not None:
        ras_object.initialize(project_folder, ras_exe_path, prj_file=specified_prj_file,
                              load_results_summary=load_results_summary)
    else:
        ras_object.initialize(project_folder, ras_exe_path,
                              load_results_summary=load_results_summary)

    # Store version for RasControl (legacy COM interface support)
    ras_object.ras_version = ras_version if ras_version else detected_version

    # NOTE: Removed automatic global ras update for thread-safety
    # When ras_object is explicitly passed, we should NOT modify the global ras object
    # This allows multiple threads to use separate RasPrj instances without conflicts
    #
    # Previous behavior (removed):
    #   if ras_object is not ras:
    #       ras.initialize(project_folder, ras_exe_path, ...)  # This was NOT thread-safe
    #
    # If you need to update the global ras object, call init_ras_project without ras_object:
    #   init_ras_project(path, version)  # Updates global ras
    # Or explicitly pass ras_object=None (default behavior)

    # Log CLB Engineering branding banner with version and docs links
    from . import __version__
    logger.info(
        f"ras-commander v{__version__} | "
        f"An open-source project of CLB Engineering Corporation (https://clbengineering.com/) | "
        f"Docs: https://ras-commander.readthedocs.io | "
        f"GitHub: https://github.com/gpt-cmdr/ras-commander"
    )
    logger.info(f"Project initialized: {ras_object.project_name} | Folder: {ras_object.project_folder}")
    logger.info(f"Using HEC-RAS executable: {ras_exe_path}")

    if not hide_intro:
        _obj = "ras" if ras_object is ras else "ras_object"
        logger.info(
            "\n"
            "═══════════════════════════════════════════════════════════════════════\n"
            "ras-commander | HEC-RAS Automation Library\n"
            "Docs: https://rascommander.info/\n"
            "Repo: https://github.com/gpt-cmdr/ras-commander\n"
            "═══════════════════════════════════════════════════════════════════════\n"
            "\n"
            "PROJECT DATAFRAMES (single source of truth — use these, not file globbing):\n"
            f"  {_obj}.plan_df        Plans, HDF paths, geometry/flow associations\n"
            f"  {_obj}.geom_df        Geometry files and HDF preprocessor paths\n"
            f"  {_obj}.flow_df        Steady flow files\n"
            f"  {_obj}.unsteady_df    Unsteady flow files and configurations\n"
            f"  {_obj}.boundaries_df  Boundary conditions (type, name, location)\n"
            f"  {_obj}.results_df     Lightweight HDF results summaries\n"
            f"  {_obj}.rasmap_df      RASMapper layers, terrain, land cover paths\n"
            "\n"
            "KEY APIS (static classes — call directly, never instantiate):\n"
            "  Execution:    RasCmdr.compute_plan() / compute_parallel() / compute_test_mode()\n"
            "  Plan Files:   RasPlan.clone_plan() / clone_geom() / set_geom()\n"
            "  Unsteady:     RasUnsteady — IC/BC management, gate openings, precipitation\n"
            "  Geometry:     GeomCrossSection, GeomBridge, GeomStorage, GeomLateral, GeomMesh\n"
            "  HDF Results:  HdfResultsPlan.get_wse() / get_compute_messages()\n"
            "                HdfResultsMesh.get_mesh_max_ws() / get_mesh_cells_timeseries()\n"
            "                HdfMesh.get_mesh_cell_points()\n"
            "  QA/QC:        RasCheck.run_check() / RasFixit (geometry repair)\n"
            "  DSS:          RasDss.get_timeseries() / check_pathname()\n"
            "  USGS:         UsgsGaugeSpatial, GaugeMatcher, RasUsgsBoundaryGeneration\n"
            "  Precipitation: StormGenerator, Atlas14Storm, PrecipAorc, Atlas14Variance\n"
            "  Terrain:      RasTerrain.create_terrain_hdf() / RasTerrainMod\n"
            "\n"
            "MULTI-PROJECT: Pass ras_object= to all API calls when using local RasPrj instances.\n"
            "\n"
            "EXAMPLES: 100+ notebooks in examples/ (100s=execution, 200s=geometry, 300s=unsteady,\n"
            "  400s=HDF results, 500s=remote, 800s=QA/QC, 900s=data integration).\n"
            "  Review relevant notebooks before assembling new workflows.\n"
            "\n"
            "PLATFORM: Most HEC-RAS operations require Windows. Linux/Wine support for\n"
            "  headless execution, data access, geometry modification, and preprocessing\n"
            "  is available via RasProcess (HEC-RAS 6.6+). See ras_commander/RasProcess.py.\n"
            "  Remote distributed execution: ras_commander/remote/ (PsExec, Docker, SSH, cloud).\n"
            "═══════════════════════════════════════════════════════════════════════"
        )

    return ras_object

RasPrj

RasPrj

RasPrj.py - Manages HEC-RAS projects within the ras-commander library

This module provides a class for managing HEC-RAS projects.

Classes:

Name Description
RasPrj

A class for managing HEC-RAS projects.

Functions:

Name Description
init_ras_project

Initialize a RAS project.

get_ras_exe

Determine the HEC-RAS executable path based on the input.

DEVELOPER NOTE: This class is used to initialize a RAS project and is used in conjunction with the RasCmdr class to manage the execution of RAS plans. By default, the RasPrj class is initialized with the global 'ras' object. However, you can create multiple RasPrj instances to manage multiple projects. Do not mix and match global 'ras' object instances and custom instances of RasPrj - it will cause errors.

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()).

Example

@log_call def my_function():

Text Only
logger.debug("Additional debug information")
# Function logic here

All of the methods in this class are class methods and are designed to be used with instances of the class.

List of Functions in RasPrj: - initialize() - _load_project_data() - _get_geom_file_for_plan() - _parse_plan_file() - _parse_unsteady_file() - _parse_flow_file() - _get_prj_entries() - _parse_boundary_condition() - is_initialized (property) - check_initialized() - find_ras_prj() - get_project_name() - get_prj_entries() - get_plan_entries() - get_flow_entries() - get_unsteady_entries() - get_geom_entries() - get_hdf_entries() - print_data() - get_plan_value() - get_boundary_conditions() - update_results_df() - get_results_entries()

Functions in RasPrj that are not part of the class:
- init_ras_project() - get_ras_exe()

Plan Execution

RasCmdr

RasCmdr

RasCmdr - Execution operations for running HEC-RAS simulations

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()).

Example

@log_call def my_function():

Text Only
logger.debug("Additional debug information")
# Function logic here

All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasCmdr: - compute_plan() - compute_parallel() - compute_test_mode()

Real-Time Execution Monitoring (v0.88.0+)

The stream_callback parameter enables real-time progress monitoring during HEC-RAS execution:

Python
from ras_commander import RasCmdr
from ras_commander.callbacks import ConsoleCallback

# Simple console monitoring
callback = ConsoleCallback(verbose=True)
RasCmdr.compute_plan("01", stream_callback=callback)

Output:

Text Only
[Plan 01] Starting execution...
[Plan 01] Geometry Preprocessor Version 6.6
[Plan 01] Computing Cross Section HTAB's
[Plan 01] Starting Unsteady Flow Computations
[Plan 01] Time: 01JAN2020 0600 [  1.25% Done]
[Plan 01] SUCCESS in 45.2s

Callback Lifecycle

Callbacks receive notifications at key execution points:

  1. on_prep_start(plan_number) - Before geometry preprocessing
  2. on_prep_complete(plan_number) - After preprocessing
  3. on_exec_start(plan_number, command) - When HEC-RAS subprocess starts
  4. on_exec_message(plan_number, message) - Each .bco file message (real-time)
  5. on_exec_complete(plan_number, success, duration) - After execution
  6. on_verify_result(plan_number, verified) - After HDF verification (if verify=True)
Built-in Callbacks
ConsoleCallback

Simple callback that prints execution progress to console.

This is the simplest possible callback implementation, suitable for: - Interactive sessions - Debugging - Quick scripts

Thread Safety

Uses print() with file argument for atomic writes. Safe for concurrent use in compute_parallel().

Example

from ras_commander import RasCmdr from ras_commander.callbacks import ConsoleCallback

callback = ConsoleCallback() RasCmdr.compute_plan("01", stream_callback=callback) [Plan 01] Starting execution... [Plan 01] Geometry Preprocessor Version 6.6 [Plan 01] SUCCESS in 45.2s

Source code in ras_commander/callbacks.py
Python
class ConsoleCallback:
    """
    Simple callback that prints execution progress to console.

    This is the simplest possible callback implementation, suitable for:
    - Interactive sessions
    - Debugging
    - Quick scripts

    Thread Safety:
        Uses print() with file argument for atomic writes.
        Safe for concurrent use in compute_parallel().

    Example:
        >>> from ras_commander import RasCmdr
        >>> from ras_commander.callbacks import ConsoleCallback
        >>>
        >>> callback = ConsoleCallback()
        >>> RasCmdr.compute_plan("01", stream_callback=callback)
        [Plan 01] Starting execution...
        [Plan 01] Geometry Preprocessor Version 6.6
        [Plan 01] SUCCESS in 45.2s
    """

    def __init__(self, verbose: bool = False):
        """
        Initialize console callback.

        Args:
            verbose: If True, print all messages. If False, only print start/complete.
        """
        self.verbose = verbose

    def on_exec_start(self, plan_number: str, command: str) -> None:
        """Print execution start message."""
        print(f"[Plan {plan_number}] Starting execution...", file=sys.stdout, flush=True)
        if self.verbose:
            print(f"[Plan {plan_number}] Command: {command}", file=sys.stdout, flush=True)

    def on_exec_message(self, plan_number: str, message: str) -> None:
        """Print execution messages (if verbose mode enabled)."""
        if self.verbose:
            print(f"[Plan {plan_number}] {message.strip()}", file=sys.stdout, flush=True)

    def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
        """Print execution completion message."""
        status = "SUCCESS" if success else "FAILED"
        print(f"[Plan {plan_number}] {status} in {duration:.1f}s", file=sys.stdout, flush=True)
__init__
Python
__init__(verbose: bool = False)

Initialize console callback.

Parameters:

Name Type Description Default
verbose bool

If True, print all messages. If False, only print start/complete.

False
Source code in ras_commander/callbacks.py
Python
def __init__(self, verbose: bool = False):
    """
    Initialize console callback.

    Args:
        verbose: If True, print all messages. If False, only print start/complete.
    """
    self.verbose = verbose
on_exec_start
Python
on_exec_start(plan_number: str, command: str) -> None

Print execution start message.

Source code in ras_commander/callbacks.py
Python
def on_exec_start(self, plan_number: str, command: str) -> None:
    """Print execution start message."""
    print(f"[Plan {plan_number}] Starting execution...", file=sys.stdout, flush=True)
    if self.verbose:
        print(f"[Plan {plan_number}] Command: {command}", file=sys.stdout, flush=True)
on_exec_message
Python
on_exec_message(plan_number: str, message: str) -> None

Print execution messages (if verbose mode enabled).

Source code in ras_commander/callbacks.py
Python
def on_exec_message(self, plan_number: str, message: str) -> None:
    """Print execution messages (if verbose mode enabled)."""
    if self.verbose:
        print(f"[Plan {plan_number}] {message.strip()}", file=sys.stdout, flush=True)
on_exec_complete
Python
on_exec_complete(plan_number: str, success: bool, duration: float) -> None

Print execution completion message.

Source code in ras_commander/callbacks.py
Python
def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
    """Print execution completion message."""
    status = "SUCCESS" if success else "FAILED"
    print(f"[Plan {plan_number}] {status} in {duration:.1f}s", file=sys.stdout, flush=True)
FileLoggerCallback

Callback that writes execution progress to per-plan log files.

Creates a separate log file for each plan, enabling: - Detailed execution logs - Post-execution analysis - Archival records

Thread Safety

Uses threading.Lock to ensure thread-safe file operations. Safe for concurrent use in compute_parallel().

Example

from pathlib import Path from ras_commander.callbacks import FileLoggerCallback

callback = FileLoggerCallback(output_dir=Path("logs")) RasCmdr.compute_plan("01", stream_callback=callback)

Creates logs/plan_01_execution.log with full details
Source code in ras_commander/callbacks.py
Python
class FileLoggerCallback:
    """
    Callback that writes execution progress to per-plan log files.

    Creates a separate log file for each plan, enabling:
    - Detailed execution logs
    - Post-execution analysis
    - Archival records

    Thread Safety:
        Uses threading.Lock to ensure thread-safe file operations.
        Safe for concurrent use in compute_parallel().

    Example:
        >>> from pathlib import Path
        >>> from ras_commander.callbacks import FileLoggerCallback
        >>>
        >>> callback = FileLoggerCallback(output_dir=Path("logs"))
        >>> RasCmdr.compute_plan("01", stream_callback=callback)
        # Creates logs/plan_01_execution.log with full details
    """

    def __init__(self, output_dir: Path):
        """
        Initialize file logger callback.

        Args:
            output_dir: Directory for log files (created if doesn't exist)
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.log_handles = {}
        self.lock = Lock()
        logger.info(f"FileLoggerCallback initialized: {self.output_dir}")

    def on_exec_start(self, plan_number: str, command: str) -> None:
        """Open log file for this plan."""
        with self.lock:
            log_file = self.output_dir / f"plan_{plan_number}_execution.log"
            self.log_handles[plan_number] = open(log_file, 'w', encoding='utf-8')
            self.log_handles[plan_number].write(f"=== Plan {plan_number} Execution Log ===\n")
            self.log_handles[plan_number].write(f"Command: {command}\n")
            self.log_handles[plan_number].write("=" * 80 + "\n\n")
            self.log_handles[plan_number].flush()

    def on_exec_message(self, plan_number: str, message: str) -> None:
        """Write message to plan's log file."""
        with self.lock:
            if plan_number in self.log_handles:
                self.log_handles[plan_number].write(message + '\n')
                self.log_handles[plan_number].flush()

    def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
        """Write completion message and close log file."""
        with self.lock:
            if plan_number in self.log_handles:
                status = "SUCCESS" if success else "FAILED"
                self.log_handles[plan_number].write("\n" + "=" * 80 + "\n")
                self.log_handles[plan_number].write(f"Execution {status} in {duration:.1f} seconds\n")
                self.log_handles[plan_number].close()
                del self.log_handles[plan_number]

    def __del__(self):
        """Cleanup: close any remaining open file handles."""
        with self.lock:
            for handle in self.log_handles.values():
                try:
                    handle.close()
                except:
                    pass
__init__
Python
__init__(output_dir: Path)

Initialize file logger callback.

Parameters:

Name Type Description Default
output_dir Path

Directory for log files (created if doesn't exist)

required
Source code in ras_commander/callbacks.py
Python
def __init__(self, output_dir: Path):
    """
    Initialize file logger callback.

    Args:
        output_dir: Directory for log files (created if doesn't exist)
    """
    self.output_dir = Path(output_dir)
    self.output_dir.mkdir(parents=True, exist_ok=True)
    self.log_handles = {}
    self.lock = Lock()
    logger.info(f"FileLoggerCallback initialized: {self.output_dir}")
on_exec_start
Python
on_exec_start(plan_number: str, command: str) -> None

Open log file for this plan.

Source code in ras_commander/callbacks.py
Python
def on_exec_start(self, plan_number: str, command: str) -> None:
    """Open log file for this plan."""
    with self.lock:
        log_file = self.output_dir / f"plan_{plan_number}_execution.log"
        self.log_handles[plan_number] = open(log_file, 'w', encoding='utf-8')
        self.log_handles[plan_number].write(f"=== Plan {plan_number} Execution Log ===\n")
        self.log_handles[plan_number].write(f"Command: {command}\n")
        self.log_handles[plan_number].write("=" * 80 + "\n\n")
        self.log_handles[plan_number].flush()
on_exec_message
Python
on_exec_message(plan_number: str, message: str) -> None

Write message to plan's log file.

Source code in ras_commander/callbacks.py
Python
def on_exec_message(self, plan_number: str, message: str) -> None:
    """Write message to plan's log file."""
    with self.lock:
        if plan_number in self.log_handles:
            self.log_handles[plan_number].write(message + '\n')
            self.log_handles[plan_number].flush()
on_exec_complete
Python
on_exec_complete(plan_number: str, success: bool, duration: float) -> None

Write completion message and close log file.

Source code in ras_commander/callbacks.py
Python
def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
    """Write completion message and close log file."""
    with self.lock:
        if plan_number in self.log_handles:
            status = "SUCCESS" if success else "FAILED"
            self.log_handles[plan_number].write("\n" + "=" * 80 + "\n")
            self.log_handles[plan_number].write(f"Execution {status} in {duration:.1f} seconds\n")
            self.log_handles[plan_number].close()
            del self.log_handles[plan_number]
__del__
Python
__del__()

Cleanup: close any remaining open file handles.

Source code in ras_commander/callbacks.py
Python
def __del__(self):
    """Cleanup: close any remaining open file handles."""
    with self.lock:
        for handle in self.log_handles.values():
            try:
                handle.close()
            except:
                pass
ProgressBarCallback

Callback that displays tqdm progress bars for execution.

Shows real-time progress with: - Per-plan progress bar - Last message displayed - Execution time tracking

Thread Safety

Uses threading.Lock to ensure thread-safe tqdm operations. Safe for concurrent use in compute_parallel().

Requirements

Requires tqdm package: pip install tqdm

Example

from ras_commander.callbacks import ProgressBarCallback

callback = ProgressBarCallback() RasCmdr.compute_plan("01", stream_callback=callback) Plan 01: 100%|████████████| 1234/1234 [00:45<00:00, 27.42msg/s]

Source code in ras_commander/callbacks.py
Python
class ProgressBarCallback:
    """
    Callback that displays tqdm progress bars for execution.

    Shows real-time progress with:
    - Per-plan progress bar
    - Last message displayed
    - Execution time tracking

    Thread Safety:
        Uses threading.Lock to ensure thread-safe tqdm operations.
        Safe for concurrent use in compute_parallel().

    Requirements:
        Requires tqdm package: pip install tqdm

    Example:
        >>> from ras_commander.callbacks import ProgressBarCallback
        >>>
        >>> callback = ProgressBarCallback()
        >>> RasCmdr.compute_plan("01", stream_callback=callback)
        Plan 01: 100%|████████████| 1234/1234 [00:45<00:00, 27.42msg/s]
    """

    def __init__(self):
        """Initialize progress bar callback."""
        if not TQDM_AVAILABLE:
            raise ImportError(
                "ProgressBarCallback requires tqdm package. "
                "Install with: pip install tqdm"
            )
        self.pbars = {}
        self.lock = Lock()

    def on_exec_start(self, plan_number: str, command: str) -> None:
        """Create progress bar for this plan."""
        with self.lock:
            self.pbars[plan_number] = tqdm(
                desc=f"Plan {plan_number}",
                unit="msg",
                dynamic_ncols=True
            )

    def on_exec_message(self, plan_number: str, message: str) -> None:
        """Update progress bar with new message."""
        with self.lock:
            if plan_number in self.pbars:
                # Show last 50 characters of message
                short_message = message.strip()[-50:]
                self.pbars[plan_number].set_postfix_str(short_message)
                self.pbars[plan_number].update(1)

    def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
        """Close progress bar and display final status."""
        with self.lock:
            if plan_number in self.pbars:
                status = "SUCCESS" if success else "FAILED"
                self.pbars[plan_number].set_postfix_str(f"{status} in {duration:.1f}s")
                self.pbars[plan_number].close()
                del self.pbars[plan_number]
__init__
Python
__init__()

Initialize progress bar callback.

Source code in ras_commander/callbacks.py
Python
def __init__(self):
    """Initialize progress bar callback."""
    if not TQDM_AVAILABLE:
        raise ImportError(
            "ProgressBarCallback requires tqdm package. "
            "Install with: pip install tqdm"
        )
    self.pbars = {}
    self.lock = Lock()
on_exec_start
Python
on_exec_start(plan_number: str, command: str) -> None

Create progress bar for this plan.

Source code in ras_commander/callbacks.py
Python
def on_exec_start(self, plan_number: str, command: str) -> None:
    """Create progress bar for this plan."""
    with self.lock:
        self.pbars[plan_number] = tqdm(
            desc=f"Plan {plan_number}",
            unit="msg",
            dynamic_ncols=True
        )
on_exec_message
Python
on_exec_message(plan_number: str, message: str) -> None

Update progress bar with new message.

Source code in ras_commander/callbacks.py
Python
def on_exec_message(self, plan_number: str, message: str) -> None:
    """Update progress bar with new message."""
    with self.lock:
        if plan_number in self.pbars:
            # Show last 50 characters of message
            short_message = message.strip()[-50:]
            self.pbars[plan_number].set_postfix_str(short_message)
            self.pbars[plan_number].update(1)
on_exec_complete
Python
on_exec_complete(plan_number: str, success: bool, duration: float) -> None

Close progress bar and display final status.

Source code in ras_commander/callbacks.py
Python
def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
    """Close progress bar and display final status."""
    with self.lock:
        if plan_number in self.pbars:
            status = "SUCCESS" if success else "FAILED"
            self.pbars[plan_number].set_postfix_str(f"{status} in {duration:.1f}s")
            self.pbars[plan_number].close()
            del self.pbars[plan_number]
SynchronizedCallback

Thread-safe wrapper for any callback implementation.

Wraps an existing callback to add thread-safety using locks. Useful when: - Using a callback that isn't thread-safe - Working with compute_parallel() - Sharing state across callbacks

Thread Safety

Wraps all callback methods with threading.Lock. Guarantees only one thread executes callback at a time.

Example

class MyCallback: ... def init(self): ... self.messages = [] # Not thread-safe! ... def on_exec_message(self, plan_number, message): ... self.messages.append((plan_number, message))

unsafe_callback = MyCallback() safe_callback = SynchronizedCallback(unsafe_callback) RasCmdr.compute_parallel(["01", "02"], stream_callback=safe_callback)

Source code in ras_commander/callbacks.py
Python
class SynchronizedCallback:
    """
    Thread-safe wrapper for any callback implementation.

    Wraps an existing callback to add thread-safety using locks.
    Useful when:
    - Using a callback that isn't thread-safe
    - Working with compute_parallel()
    - Sharing state across callbacks

    Thread Safety:
        Wraps all callback methods with threading.Lock.
        Guarantees only one thread executes callback at a time.

    Example:
        >>> class MyCallback:
        ...     def __init__(self):
        ...         self.messages = []  # Not thread-safe!
        ...     def on_exec_message(self, plan_number, message):
        ...         self.messages.append((plan_number, message))
        >>>
        >>> unsafe_callback = MyCallback()
        >>> safe_callback = SynchronizedCallback(unsafe_callback)
        >>> RasCmdr.compute_parallel(["01", "02"], stream_callback=safe_callback)
    """

    def __init__(self, callback: ExecutionCallback):
        """
        Wrap a callback with thread-safety.

        Args:
            callback: Callback object to wrap (must implement ExecutionCallback methods)
        """
        self._callback = callback
        self._lock = Lock()

    def on_prep_start(self, plan_number: str) -> None:
        """Thread-safe wrapper for on_prep_start."""
        with self._lock:
            if hasattr(self._callback, 'on_prep_start'):
                self._callback.on_prep_start(plan_number)

    def on_prep_complete(self, plan_number: str) -> None:
        """Thread-safe wrapper for on_prep_complete."""
        with self._lock:
            if hasattr(self._callback, 'on_prep_complete'):
                self._callback.on_prep_complete(plan_number)

    def on_exec_start(self, plan_number: str, command: str) -> None:
        """Thread-safe wrapper for on_exec_start."""
        with self._lock:
            if hasattr(self._callback, 'on_exec_start'):
                self._callback.on_exec_start(plan_number, command)

    def on_exec_message(self, plan_number: str, message: str) -> None:
        """Thread-safe wrapper for on_exec_message."""
        with self._lock:
            if hasattr(self._callback, 'on_exec_message'):
                self._callback.on_exec_message(plan_number, message)

    def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
        """Thread-safe wrapper for on_exec_complete."""
        with self._lock:
            if hasattr(self._callback, 'on_exec_complete'):
                self._callback.on_exec_complete(plan_number, success, duration)

    def on_verify_result(self, plan_number: str, verified: bool) -> None:
        """Thread-safe wrapper for on_verify_result."""
        with self._lock:
            if hasattr(self._callback, 'on_verify_result'):
                self._callback.on_verify_result(plan_number, verified)
__init__
Python
__init__(callback: ExecutionCallback)

Wrap a callback with thread-safety.

Parameters:

Name Type Description Default
callback ExecutionCallback

Callback object to wrap (must implement ExecutionCallback methods)

required
Source code in ras_commander/callbacks.py
Python
def __init__(self, callback: ExecutionCallback):
    """
    Wrap a callback with thread-safety.

    Args:
        callback: Callback object to wrap (must implement ExecutionCallback methods)
    """
    self._callback = callback
    self._lock = Lock()
on_prep_start
Python
on_prep_start(plan_number: str) -> None

Thread-safe wrapper for on_prep_start.

Source code in ras_commander/callbacks.py
Python
def on_prep_start(self, plan_number: str) -> None:
    """Thread-safe wrapper for on_prep_start."""
    with self._lock:
        if hasattr(self._callback, 'on_prep_start'):
            self._callback.on_prep_start(plan_number)
on_prep_complete
Python
on_prep_complete(plan_number: str) -> None

Thread-safe wrapper for on_prep_complete.

Source code in ras_commander/callbacks.py
Python
def on_prep_complete(self, plan_number: str) -> None:
    """Thread-safe wrapper for on_prep_complete."""
    with self._lock:
        if hasattr(self._callback, 'on_prep_complete'):
            self._callback.on_prep_complete(plan_number)
on_exec_start
Python
on_exec_start(plan_number: str, command: str) -> None

Thread-safe wrapper for on_exec_start.

Source code in ras_commander/callbacks.py
Python
def on_exec_start(self, plan_number: str, command: str) -> None:
    """Thread-safe wrapper for on_exec_start."""
    with self._lock:
        if hasattr(self._callback, 'on_exec_start'):
            self._callback.on_exec_start(plan_number, command)
on_exec_message
Python
on_exec_message(plan_number: str, message: str) -> None

Thread-safe wrapper for on_exec_message.

Source code in ras_commander/callbacks.py
Python
def on_exec_message(self, plan_number: str, message: str) -> None:
    """Thread-safe wrapper for on_exec_message."""
    with self._lock:
        if hasattr(self._callback, 'on_exec_message'):
            self._callback.on_exec_message(plan_number, message)
on_exec_complete
Python
on_exec_complete(plan_number: str, success: bool, duration: float) -> None

Thread-safe wrapper for on_exec_complete.

Source code in ras_commander/callbacks.py
Python
def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
    """Thread-safe wrapper for on_exec_complete."""
    with self._lock:
        if hasattr(self._callback, 'on_exec_complete'):
            self._callback.on_exec_complete(plan_number, success, duration)
on_verify_result
Python
on_verify_result(plan_number: str, verified: bool) -> None

Thread-safe wrapper for on_verify_result.

Source code in ras_commander/callbacks.py
Python
def on_verify_result(self, plan_number: str, verified: bool) -> None:
    """Thread-safe wrapper for on_verify_result."""
    with self._lock:
        if hasattr(self._callback, 'on_verify_result'):
            self._callback.on_verify_result(plan_number, verified)
Custom Callbacks

Create custom callbacks by implementing the ExecutionCallback protocol:

Python
class CustomCallback:
    """Minimal custom callback - implement only what you need."""

    def on_exec_complete(self, plan_number, success, duration):
        status = "SUCCESS" if success else "FAILED"
        print(f"Plan {plan_number}: {status} in {duration:.1f}s")

# Use it
RasCmdr.compute_plan("01", stream_callback=CustomCallback())

Thread Safety for Parallel Execution

Callbacks used with compute_parallel() must be thread-safe. Use threading.Lock for shared state:

Python
from threading import Lock

class ThreadSafeCallback:
    def __init__(self):
        self.lock = Lock()
        self.results = {}

    def on_exec_complete(self, plan_number, success, duration):
        with self.lock:
            self.results[plan_number] = (success, duration)
BcoMonitor Utility
BcoMonitor dataclass

Monitor HEC-RAS .bco file for execution signals and messages.

The .bco file is created when 'Write Detailed= 1' is set in the plan file. It contains detailed computation messages written incrementally during execution.

This class enables: - Real-time progress monitoring via .bco file polling - Message streaming to callbacks - Signal detection for early termination - Thread-safe operation (no shared state)

Attributes:

Name Type Description
project_path Path

Path to HEC-RAS project folder

plan_number str

Plan number (e.g., "01", "02")

project_name str

Project name (without extension)

signal_string str

String to detect in .bco for early termination

check_interval float

Seconds between .bco file polls (default: 0.5)

max_wait_seconds int

Maximum wait time before timeout (default: 300)

message_callback Optional[Callable[[str], None]]

Optional callback for new messages

Example

monitor = BcoMonitor( ... project_path=Path("/path/to/project"), ... plan_number="01", ... project_name="MyProject", ... message_callback=lambda msg: print(msg) ... ) process = subprocess.Popen(["RAS.exe", ...]) signal_detected = monitor.monitor_until_signal(process)

Source code in ras_commander/RasBco.py
Python
@dataclass
class BcoMonitor:
    """
    Monitor HEC-RAS .bco file for execution signals and messages.

    The .bco file is created when 'Write Detailed= 1' is set in the plan file.
    It contains detailed computation messages written incrementally during execution.

    This class enables:
    - Real-time progress monitoring via .bco file polling
    - Message streaming to callbacks
    - Signal detection for early termination
    - Thread-safe operation (no shared state)

    Attributes:
        project_path: Path to HEC-RAS project folder
        plan_number: Plan number (e.g., "01", "02")
        project_name: Project name (without extension)
        signal_string: String to detect in .bco for early termination
        check_interval: Seconds between .bco file polls (default: 0.5)
        max_wait_seconds: Maximum wait time before timeout (default: 300)
        message_callback: Optional callback for new messages

    Example:
        >>> monitor = BcoMonitor(
        ...     project_path=Path("/path/to/project"),
        ...     plan_number="01",
        ...     project_name="MyProject",
        ...     message_callback=lambda msg: print(msg)
        ... )
        >>> process = subprocess.Popen(["RAS.exe", ...])
        >>> signal_detected = monitor.monitor_until_signal(process)
    """

    # Configuration
    project_path: Path
    plan_number: str
    project_name: str
    signal_string: str = "Starting Unsteady Flow Computations"
    check_interval: float = 0.5
    max_wait_seconds: int = 300

    # Optional callback for streaming messages
    message_callback: Optional[Callable[[str], None]] = None

    # Internal state (initialized in __post_init__)
    bco_file: Path = field(init=False)
    execution_start_time: Optional[float] = field(default=None, init=False)
    _last_file_position: int = field(default=0, init=False)

    def __post_init__(self):
        """Initialize paths and validate configuration."""
        self.bco_file = self.project_path / f"{self.project_name}.bco{self.plan_number}"
        logger.debug(f"BcoMonitor initialized for {self.bco_file.name}")

    @staticmethod
    def enable_detailed_logging(plan_file_path: Path) -> bool:
        """
        Enable detailed logging in a plan file by setting 'Write Detailed= 1'.

        This creates a .bcoXX file during HEC-RAS execution that can be monitored
        for the "Starting Unsteady Flow Computations" signal and other messages.

        Args:
            plan_file_path: Path to the plan file (.pXX)

        Returns:
            bool: True if successful, False otherwise

        Note:
            This modifies the plan file in-place. The modification is safe and
            follows HEC-RAS plan file format conventions.
        """
        try:
            content = plan_file_path.read_text(encoding='utf-8', errors='ignore')

            # Check if Write Detailed line exists
            if "Write Detailed=" in content:
                # Replace existing setting
                new_content = re.sub(
                    r'Write Detailed=\s*\d+',
                    'Write Detailed= 1',
                    content
                )
                if new_content != content:
                    plan_file_path.write_text(new_content, encoding='utf-8')
                    logger.debug(f"Enabled detailed logging in {plan_file_path.name}")
            else:
                # Add the setting after Run HTab line or at the end
                if "Run HTab=" in content:
                    new_content = content.replace(
                        "Run HTab=",
                        "Write Detailed= 1\nRun HTab="
                    )
                else:
                    new_content = content + "\nWrite Detailed= 1\n"
                plan_file_path.write_text(new_content, encoding='utf-8')
                logger.debug(f"Added detailed logging setting to {plan_file_path.name}")

            return True
        except Exception as e:
            logger.warning(f"Could not enable detailed logging: {e}")
            return False

    def monitor_until_signal(self, process: subprocess.Popen) -> bool:
        """
        Monitor .bco file until signal string appears or process completes.

        This method polls the .bco file at regular intervals, checking for:
        1. Process completion (normal exit or crash)
        2. Signal string detection (for early termination)
        3. New messages (streamed to callback if provided)

        Args:
            process: Running subprocess.Popen instance

        Returns:
            bool: True if signal detected, False if process completed without signal

        Note:
            - This is a blocking call that returns when signal appears or process exits
            - Callbacks are invoked from the calling thread (not a new thread)
            - File is read incrementally to minimize I/O overhead
        """
        self.execution_start_time = time.time()
        start_time = time.time()

        logger.info(f"Monitoring {self.bco_file.name} for '{self.signal_string}' signal...")

        while time.time() - start_time < self.max_wait_seconds:
            # Check if process died
            if process.poll() is not None:
                logger.info(f"Process exited with code {process.returncode}")
                # Read any final messages
                if self.bco_file.exists():
                    self._read_and_callback_new_content()
                return False

            # Check for .bco file with signal detection
            if self.bco_file.exists():
                # Verify file was modified after we started execution
                file_mtime = self.bco_file.stat().st_mtime
                if file_mtime >= self.execution_start_time:
                    # Read new content and check for signal
                    content = self._read_and_callback_new_content()
                    if content and self.signal_string in content:
                        logger.info(f"Detected '{self.signal_string}' in {self.bco_file.name}")
                        return True

            time.sleep(self.check_interval)

        logger.warning(f"Monitoring timed out after {self.max_wait_seconds}s")
        return False

    def get_final_messages(self) -> Optional[str]:
        """
        Read complete .bco file content after execution.

        Returns:
            Optional[str]: Full .bco file content, or None if file doesn't exist

        Note:
            Uses encoding resilience (errors='ignore') to handle partially written files.
        """
        if not self.bco_file.exists():
            return None

        try:
            return self.bco_file.read_text(encoding='utf-8', errors='ignore')
        except Exception as e:
            logger.debug(f"Could not read .bco file: {e}")
            return None

    def _read_and_callback_new_content(self) -> Optional[str]:
        """
        Read new content since last position and invoke callback if provided.

        Returns:
            Optional[str]: New content read, or None if error/no new content

        Note:
            Updates internal _last_file_position to track reading progress.
        """
        try:
            # Get current file size
            file_size = self.bco_file.stat().st_size

            # No new content since last read
            if file_size <= self._last_file_position:
                return None

            # Read from last position to current end
            with open(self.bco_file, 'r', encoding='utf-8', errors='ignore') as f:
                f.seek(self._last_file_position)
                new_content = f.read()
                self._last_file_position = f.tell()

            # Invoke callback with new content if provided
            if new_content and self.message_callback:
                # Split into lines and call back for each non-empty line
                for line in new_content.splitlines():
                    if line.strip():
                        try:
                            self.message_callback(line)
                        except Exception as e:
                            logger.warning(f"Callback error: {e}")

            return new_content

        except Exception as e:
            logger.debug(f"Could not read new .bco content: {e}")
            return None

    def has_signal(self) -> bool:
        """
        Check if signal string exists in .bco file.

        Returns:
            bool: True if signal detected, False otherwise
        """
        content = self.get_final_messages()
        return content is not None and self.signal_string in content
enable_detailed_logging staticmethod
Python
enable_detailed_logging(plan_file_path: Path) -> bool

Enable detailed logging in a plan file by setting 'Write Detailed= 1'.

This creates a .bcoXX file during HEC-RAS execution that can be monitored for the "Starting Unsteady Flow Computations" signal and other messages.

Parameters:

Name Type Description Default
plan_file_path Path

Path to the plan file (.pXX)

required

Returns:

Name Type Description
bool bool

True if successful, False otherwise

Note

This modifies the plan file in-place. The modification is safe and follows HEC-RAS plan file format conventions.

Source code in ras_commander/RasBco.py
Python
@staticmethod
def enable_detailed_logging(plan_file_path: Path) -> bool:
    """
    Enable detailed logging in a plan file by setting 'Write Detailed= 1'.

    This creates a .bcoXX file during HEC-RAS execution that can be monitored
    for the "Starting Unsteady Flow Computations" signal and other messages.

    Args:
        plan_file_path: Path to the plan file (.pXX)

    Returns:
        bool: True if successful, False otherwise

    Note:
        This modifies the plan file in-place. The modification is safe and
        follows HEC-RAS plan file format conventions.
    """
    try:
        content = plan_file_path.read_text(encoding='utf-8', errors='ignore')

        # Check if Write Detailed line exists
        if "Write Detailed=" in content:
            # Replace existing setting
            new_content = re.sub(
                r'Write Detailed=\s*\d+',
                'Write Detailed= 1',
                content
            )
            if new_content != content:
                plan_file_path.write_text(new_content, encoding='utf-8')
                logger.debug(f"Enabled detailed logging in {plan_file_path.name}")
        else:
            # Add the setting after Run HTab line or at the end
            if "Run HTab=" in content:
                new_content = content.replace(
                    "Run HTab=",
                    "Write Detailed= 1\nRun HTab="
                )
            else:
                new_content = content + "\nWrite Detailed= 1\n"
            plan_file_path.write_text(new_content, encoding='utf-8')
            logger.debug(f"Added detailed logging setting to {plan_file_path.name}")

        return True
    except Exception as e:
        logger.warning(f"Could not enable detailed logging: {e}")
        return False
monitor_until_signal
Python
monitor_until_signal(process: Popen) -> bool

Monitor .bco file until signal string appears or process completes.

This method polls the .bco file at regular intervals, checking for: 1. Process completion (normal exit or crash) 2. Signal string detection (for early termination) 3. New messages (streamed to callback if provided)

Parameters:

Name Type Description Default
process Popen

Running subprocess.Popen instance

required

Returns:

Name Type Description
bool bool

True if signal detected, False if process completed without signal

Note
  • This is a blocking call that returns when signal appears or process exits
  • Callbacks are invoked from the calling thread (not a new thread)
  • File is read incrementally to minimize I/O overhead
Source code in ras_commander/RasBco.py
Python
def monitor_until_signal(self, process: subprocess.Popen) -> bool:
    """
    Monitor .bco file until signal string appears or process completes.

    This method polls the .bco file at regular intervals, checking for:
    1. Process completion (normal exit or crash)
    2. Signal string detection (for early termination)
    3. New messages (streamed to callback if provided)

    Args:
        process: Running subprocess.Popen instance

    Returns:
        bool: True if signal detected, False if process completed without signal

    Note:
        - This is a blocking call that returns when signal appears or process exits
        - Callbacks are invoked from the calling thread (not a new thread)
        - File is read incrementally to minimize I/O overhead
    """
    self.execution_start_time = time.time()
    start_time = time.time()

    logger.info(f"Monitoring {self.bco_file.name} for '{self.signal_string}' signal...")

    while time.time() - start_time < self.max_wait_seconds:
        # Check if process died
        if process.poll() is not None:
            logger.info(f"Process exited with code {process.returncode}")
            # Read any final messages
            if self.bco_file.exists():
                self._read_and_callback_new_content()
            return False

        # Check for .bco file with signal detection
        if self.bco_file.exists():
            # Verify file was modified after we started execution
            file_mtime = self.bco_file.stat().st_mtime
            if file_mtime >= self.execution_start_time:
                # Read new content and check for signal
                content = self._read_and_callback_new_content()
                if content and self.signal_string in content:
                    logger.info(f"Detected '{self.signal_string}' in {self.bco_file.name}")
                    return True

        time.sleep(self.check_interval)

    logger.warning(f"Monitoring timed out after {self.max_wait_seconds}s")
    return False
ExecutionCallback Protocol
ExecutionCallback

ExecutionCallback - Protocol for HEC-RAS execution progress callbacks.

This module defines the callback interface for monitoring HEC-RAS computation lifecycle events. Callbacks enable real-time progress tracking, logging, and UI updates during long-running simulations.

The Protocol pattern allows partial implementation - classes only need to implement the callback methods they care about.

ExecutionCallback

Bases: Protocol

Protocol for execution progress callbacks.

This defines the interface for monitoring HEC-RAS computation lifecycle. Implementations can provide any subset of these methods - all are optional.

Lifecycle Order
  1. on_prep_start() - Before geometry preprocessing
  2. on_prep_complete() - After preprocessing
  3. on_exec_start() - HEC-RAS subprocess started
  4. on_exec_message() - During execution (potentially many calls)
  5. on_exec_complete() - HEC-RAS subprocess finished
  6. on_verify_result() - After HDF verification (if verify=True)
Thread Safety

When used with compute_parallel(), callbacks are invoked from worker threads concurrently. Implementations MUST be thread-safe. Use locks, thread-local storage, or atomic operations as needed.

Example - Simple Console Logging

class ConsoleCallback: ... def on_exec_start(self, plan_number, command): ... print(f"[{plan_number}] Starting...") ... def on_exec_message(self, plan_number, message): ... print(f"[{plan_number}] {message}") ... def on_exec_complete(self, plan_number, success, duration): ... print(f"[{plan_number}] Done in {duration:.1f}s")

Example - Thread-Safe File Logging

from threading import Lock class FileCallback: ... def init(self): ... self.lock = Lock() ... self.files = {} ... def on_exec_start(self, plan_number, command): ... with self.lock: ... self.files[plan_number] = open(f"plan_{plan_number}.log", 'w') ... def on_exec_message(self, plan_number, message): ... with self.lock: ... if plan_number in self.files: ... self.files[plan_number].write(message + '\n')

Source code in ras_commander/ExecutionCallback.py
Python
@runtime_checkable
class ExecutionCallback(Protocol):
    """
    Protocol for execution progress callbacks.

    This defines the interface for monitoring HEC-RAS computation lifecycle.
    Implementations can provide any subset of these methods - all are optional.

    Lifecycle Order:
        1. on_prep_start()     - Before geometry preprocessing
        2. on_prep_complete()  - After preprocessing
        3. on_exec_start()     - HEC-RAS subprocess started
        4. on_exec_message()   - During execution (potentially many calls)
        5. on_exec_complete()  - HEC-RAS subprocess finished
        6. on_verify_result()  - After HDF verification (if verify=True)

    Thread Safety:
        When used with compute_parallel(), callbacks are invoked from
        worker threads concurrently. Implementations MUST be thread-safe.
        Use locks, thread-local storage, or atomic operations as needed.

    Example - Simple Console Logging:
        >>> class ConsoleCallback:
        ...     def on_exec_start(self, plan_number, command):
        ...         print(f"[{plan_number}] Starting...")
        ...     def on_exec_message(self, plan_number, message):
        ...         print(f"[{plan_number}] {message}")
        ...     def on_exec_complete(self, plan_number, success, duration):
        ...         print(f"[{plan_number}] Done in {duration:.1f}s")

    Example - Thread-Safe File Logging:
        >>> from threading import Lock
        >>> class FileCallback:
        ...     def __init__(self):
        ...         self.lock = Lock()
        ...         self.files = {}
        ...     def on_exec_start(self, plan_number, command):
        ...         with self.lock:
        ...             self.files[plan_number] = open(f"plan_{plan_number}.log", 'w')
        ...     def on_exec_message(self, plan_number, message):
        ...         with self.lock:
        ...             if plan_number in self.files:
        ...                 self.files[plan_number].write(message + '\\n')
    """

    def on_prep_start(self, plan_number: str) -> None:
        """
        Called before geometry preprocessing and core setup.

        This is invoked before:
        - Geometry preprocessor file clearing (if clear_geompre=True)
        - Number of cores configuration (if num_cores specified)

        Args:
            plan_number: Plan identifier (e.g., "01", "02")

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
        """
        ...

    def on_prep_complete(self, plan_number: str) -> None:
        """
        Called after geometry preprocessing and core setup complete.

        This is invoked after:
        - Geometry preprocessor files cleared (if applicable)
        - Number of cores set in plan file (if applicable)
        - Just before HEC-RAS subprocess starts

        Args:
            plan_number: Plan identifier (e.g., "01", "02")

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
        """
        ...

    def on_exec_start(self, plan_number: str, command: str) -> None:
        """
        Called when HEC-RAS subprocess starts.

        This is invoked immediately before subprocess execution begins.
        The command includes the full command line that will be executed.

        Args:
            plan_number: Plan identifier (e.g., "01", "02")
            command: Full command line (e.g., '"C:/RAS/RAS.exe" -c project.prj plan.p01')

        Note:
            At this point the subprocess has been constructed but not yet started.
            This is the last callback before HEC-RAS begins running.

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
        """
        ...

    def on_exec_message(self, plan_number: str, message: str) -> None:
        """
        Called for each new .bco file message during execution.

        This is invoked repeatedly as HEC-RAS writes to the .bco file.
        Messages are streamed line-by-line in near real-time (polling interval: 0.5s).

        Args:
            plan_number: Plan identifier (e.g., "01", "02")
            message: Single line from .bco file (newline stripped)

        Frequency:
            - Called potentially hundreds or thousands of times per plan
            - Frequency depends on HEC-RAS computation complexity
            - Polling interval: 0.5 seconds (configurable in BcoMonitor)

        Performance:
            - Keep callback implementation FAST (< 1ms recommended)
            - Avoid blocking I/O, network calls, or heavy computation
            - For expensive operations, queue messages and process in separate thread

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
            CRITICAL: Implement proper locking if writing to shared resources.

        Example Messages:
            - "Geometry Preprocessor Version 6.6"
            - "Computing Cross Section HTAB's"
            - "Starting Unsteady Flow Computations"
            - "Time: 01JAN2020 0600 [  1.25% Done]"
        """
        ...

    def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
        """
        Called when HEC-RAS execution finishes.

        This is invoked immediately after subprocess completes (successfully or not).

        Args:
            plan_number: Plan identifier (e.g., "01", "02")
            success: True if subprocess exited with code 0, False otherwise
            duration: Execution time in seconds (floating point)

        Note:
            - success=True does NOT guarantee HEC-RAS succeeded (it may have errors)
            - Use on_verify_result() to check if HDF contains "Complete Process"
            - duration is wall-clock time, not CPU time

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
        """
        ...

    def on_verify_result(self, plan_number: str, verified: bool) -> None:
        """
        Called after HDF verification (only if verify=True parameter used).

        This is invoked after checking HDF file for "Complete Process" message.

        Args:
            plan_number: Plan identifier (e.g., "01", "02")
            verified: True if HDF contains "Complete Process", False otherwise

        Note:
            - Only called when RasCmdr.compute_plan(..., verify=True)
            - verified=True is the strongest guarantee that HEC-RAS succeeded
            - verified=False may indicate computation errors or incomplete results

        Thread Safety:
            May be called concurrently for different plans in compute_parallel().
        """
        ...
on_prep_start
Python
on_prep_start(plan_number: str) -> None

Called before geometry preprocessing and core setup.

This is invoked before: - Geometry preprocessor file clearing (if clear_geompre=True) - Number of cores configuration (if num_cores specified)

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
Thread Safety

May be called concurrently for different plans in compute_parallel().

Source code in ras_commander/ExecutionCallback.py
Python
def on_prep_start(self, plan_number: str) -> None:
    """
    Called before geometry preprocessing and core setup.

    This is invoked before:
    - Geometry preprocessor file clearing (if clear_geompre=True)
    - Number of cores configuration (if num_cores specified)

    Args:
        plan_number: Plan identifier (e.g., "01", "02")

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
    """
    ...
on_prep_complete
Python
on_prep_complete(plan_number: str) -> None

Called after geometry preprocessing and core setup complete.

This is invoked after: - Geometry preprocessor files cleared (if applicable) - Number of cores set in plan file (if applicable) - Just before HEC-RAS subprocess starts

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
Thread Safety

May be called concurrently for different plans in compute_parallel().

Source code in ras_commander/ExecutionCallback.py
Python
def on_prep_complete(self, plan_number: str) -> None:
    """
    Called after geometry preprocessing and core setup complete.

    This is invoked after:
    - Geometry preprocessor files cleared (if applicable)
    - Number of cores set in plan file (if applicable)
    - Just before HEC-RAS subprocess starts

    Args:
        plan_number: Plan identifier (e.g., "01", "02")

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
    """
    ...
on_exec_start
Python
on_exec_start(plan_number: str, command: str) -> None

Called when HEC-RAS subprocess starts.

This is invoked immediately before subprocess execution begins. The command includes the full command line that will be executed.

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
command str

Full command line (e.g., '"C:/RAS/RAS.exe" -c project.prj plan.p01')

required
Note

At this point the subprocess has been constructed but not yet started. This is the last callback before HEC-RAS begins running.

Thread Safety

May be called concurrently for different plans in compute_parallel().

Source code in ras_commander/ExecutionCallback.py
Python
def on_exec_start(self, plan_number: str, command: str) -> None:
    """
    Called when HEC-RAS subprocess starts.

    This is invoked immediately before subprocess execution begins.
    The command includes the full command line that will be executed.

    Args:
        plan_number: Plan identifier (e.g., "01", "02")
        command: Full command line (e.g., '"C:/RAS/RAS.exe" -c project.prj plan.p01')

    Note:
        At this point the subprocess has been constructed but not yet started.
        This is the last callback before HEC-RAS begins running.

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
    """
    ...
on_exec_message
Python
on_exec_message(plan_number: str, message: str) -> None

Called for each new .bco file message during execution.

This is invoked repeatedly as HEC-RAS writes to the .bco file. Messages are streamed line-by-line in near real-time (polling interval: 0.5s).

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
message str

Single line from .bco file (newline stripped)

required
Frequency
  • Called potentially hundreds or thousands of times per plan
  • Frequency depends on HEC-RAS computation complexity
  • Polling interval: 0.5 seconds (configurable in BcoMonitor)
Performance
  • Keep callback implementation FAST (< 1ms recommended)
  • Avoid blocking I/O, network calls, or heavy computation
  • For expensive operations, queue messages and process in separate thread
Thread Safety

May be called concurrently for different plans in compute_parallel(). CRITICAL: Implement proper locking if writing to shared resources.

Example Messages
  • "Geometry Preprocessor Version 6.6"
  • "Computing Cross Section HTAB's"
  • "Starting Unsteady Flow Computations"
  • "Time: 01JAN2020 0600 [ 1.25% Done]"
Source code in ras_commander/ExecutionCallback.py
Python
def on_exec_message(self, plan_number: str, message: str) -> None:
    """
    Called for each new .bco file message during execution.

    This is invoked repeatedly as HEC-RAS writes to the .bco file.
    Messages are streamed line-by-line in near real-time (polling interval: 0.5s).

    Args:
        plan_number: Plan identifier (e.g., "01", "02")
        message: Single line from .bco file (newline stripped)

    Frequency:
        - Called potentially hundreds or thousands of times per plan
        - Frequency depends on HEC-RAS computation complexity
        - Polling interval: 0.5 seconds (configurable in BcoMonitor)

    Performance:
        - Keep callback implementation FAST (< 1ms recommended)
        - Avoid blocking I/O, network calls, or heavy computation
        - For expensive operations, queue messages and process in separate thread

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
        CRITICAL: Implement proper locking if writing to shared resources.

    Example Messages:
        - "Geometry Preprocessor Version 6.6"
        - "Computing Cross Section HTAB's"
        - "Starting Unsteady Flow Computations"
        - "Time: 01JAN2020 0600 [  1.25% Done]"
    """
    ...
on_exec_complete
Python
on_exec_complete(plan_number: str, success: bool, duration: float) -> None

Called when HEC-RAS execution finishes.

This is invoked immediately after subprocess completes (successfully or not).

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
success bool

True if subprocess exited with code 0, False otherwise

required
duration float

Execution time in seconds (floating point)

required
Note
  • success=True does NOT guarantee HEC-RAS succeeded (it may have errors)
  • Use on_verify_result() to check if HDF contains "Complete Process"
  • duration is wall-clock time, not CPU time
Thread Safety

May be called concurrently for different plans in compute_parallel().

Source code in ras_commander/ExecutionCallback.py
Python
def on_exec_complete(self, plan_number: str, success: bool, duration: float) -> None:
    """
    Called when HEC-RAS execution finishes.

    This is invoked immediately after subprocess completes (successfully or not).

    Args:
        plan_number: Plan identifier (e.g., "01", "02")
        success: True if subprocess exited with code 0, False otherwise
        duration: Execution time in seconds (floating point)

    Note:
        - success=True does NOT guarantee HEC-RAS succeeded (it may have errors)
        - Use on_verify_result() to check if HDF contains "Complete Process"
        - duration is wall-clock time, not CPU time

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
    """
    ...
on_verify_result
Python
on_verify_result(plan_number: str, verified: bool) -> None

Called after HDF verification (only if verify=True parameter used).

This is invoked after checking HDF file for "Complete Process" message.

Parameters:

Name Type Description Default
plan_number str

Plan identifier (e.g., "01", "02")

required
verified bool

True if HDF contains "Complete Process", False otherwise

required
Note
  • Only called when RasCmdr.compute_plan(..., verify=True)
  • verified=True is the strongest guarantee that HEC-RAS succeeded
  • verified=False may indicate computation errors or incomplete results
Thread Safety

May be called concurrently for different plans in compute_parallel().

Source code in ras_commander/ExecutionCallback.py
Python
def on_verify_result(self, plan_number: str, verified: bool) -> None:
    """
    Called after HDF verification (only if verify=True parameter used).

    This is invoked after checking HDF file for "Complete Process" message.

    Args:
        plan_number: Plan identifier (e.g., "01", "02")
        verified: True if HDF contains "Complete Process", False otherwise

    Note:
        - Only called when RasCmdr.compute_plan(..., verify=True)
        - verified=True is the strongest guarantee that HEC-RAS succeeded
        - verified=False may indicate computation errors or incomplete results

    Thread Safety:
        May be called concurrently for different plans in compute_parallel().
    """
    ...

All callback methods are optional - implement only what you need. The protocol uses @runtime_checkable for flexible duck-typing.

RasControl

RasControl

RasControl - HECRASController API Wrapper (ras-commander style)

Provides ras-commander style API for legacy HEC-RAS versions (3.x-4.x) that use HECRASController COM interface instead of HDF files.

Includes robust process management with session tracking, orphan detection, and optional watchdog protection for Jupyter kernel restarts.

Public functions (HEC-RAS Operations): - RasControl.run_plan(plan, ras_object=None, force_recompute=False, use_watchdog=True, max_runtime=3600) -> Tuple[bool, List[str]] - RasControl.get_steady_results(plan, ras_object=None) -> pandas.DataFrame - RasControl.get_unsteady_results(plan, max_times=None, ras_object=None) -> pandas.DataFrame - RasControl.get_output_times(plan, ras_object=None) -> List[str] - RasControl.get_plans(plan, ras_object=None) -> List[dict] - RasControl.set_current_plan(plan, ras_object=None) -> bool - RasControl.get_comp_msgs(plan, ras_object=None) -> str

Public functions (Process Management): - RasControl.list_processes(show_all=False) -> pandas.DataFrame - RasControl.scan_orphans() -> List[SessionLock] - RasControl.cleanup_orphans(interactive=True, dry_run=False) -> int - RasControl.force_cleanup_all() -> int

Private functions: - _terminate_ras_process() -> None - _is_ras_running() -> bool - RasControl._normalize_version(version: str) -> str - RasControl._get_project_info(plan, ras_object=None) -> Tuple[Path, str, Optional[str], Optional[str]] - RasControl._com_open_close(project_path: Path, version: str, operation_func: Callable[[Any], Any]) -> Any

Session tracking infrastructure: - SessionLock dataclass - Tracks active COM sessions with lock files - Module-level _active_sessions dict - Tracks all active sessions - atexit handler - Emergency cleanup on Python exit - Watchdog support - Optional independent process for kernel restart protection

RasControl Details

Open-Operate-Close Pattern

Unlike other ras-commander classes, RasControl opens HEC-RAS, performs one operation, then closes it. This prevents conflicts with modern workflows and ensures clean resource management.

Supported Versions
Version Registry Key HEC-RAS Years
"31" 3.1 Legacy
"41" 4.1 ~2008-2014
"501", "503", "505", "506" 5.0.x 2015-2019
"60" 6.0 2020
"63" 6.3 2021-2022
"66" 6.6 2023-2024
"70" 7.0 2025+
RasControl vs RasCmdr
Aspect RasControl RasCmdr
HEC-RAS Versions 3.x - 7.x (COM) 5.x+ (command line)
Data Source Live COM extraction HDF file results
Requires GUI Yes (HEC-RAS installed) Yes (HEC-RAS installed)
Use Case Legacy models, validation Modern automation
Returns pandas DataFrame bool / dict
Understanding "Max WS" in Unsteady Results

When extracting unsteady results, the first row per cross section (time_index=1) contains "Max WS" - the maximum at ANY computational timestep:

Python
# Unsteady results include special "Max WS" row
df = RasControl.get_unsteady_results("01")

# time_index=1 is "Max WS" (maximum at any timestep)
df_max = df[df['time_string'] == 'Max WS']

# time_index=2+ are actual output intervals
df_timeseries = df[df['time_string'] != 'Max WS']

# Parse datetime for analysis
df_timeseries['datetime'] = pd.to_datetime(
    df_timeseries['time_string'],
    format='%d%b%Y %H%M'
)

Max WS vs Output Interval Maximums

"Max WS" captures peaks that may occur BETWEEN output intervals. This is critical for design applications - always use "Max WS" for peak values, not max() of output intervals.

Result Columns

Steady Results (get_steady_results):

Column Type Description
river str River name
reach str Reach name
node_id str Cross section station
profile str Profile name
wsel float Water surface elevation
velocity float Total velocity
flow float Total flow
froude float Froude number
energy float Energy grade elevation
max_depth float Maximum channel depth
min_ch_el float Minimum channel elevation

Unsteady Results (get_unsteady_results): Same columns plus time_index, time_string, datetime.

Compute Messages Fallback

The get_comp_msgs() method attempts to read computation messages from multiple sources:

  1. First tries .computeMsgs.txt (modern format)
  2. Falls back to .comp_msgs.txt (legacy format)
  3. Returns empty string if neither exists

File Operations

RasPlan

RasPlan

RasPlan - Operations for handling plan files in HEC-RAS projects

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()). 3. Obtain the logger using: logger = logging.getLogger(name)

Example

@log_call def my_function(): logger = logging.getLogger(name) logger.debug("Additional debug information") # Function logic here


All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasPlan: - set_geom(): Set the geometry for a specified plan - set_steady(): Apply a steady flow file to a plan file - set_unsteady(): Apply an unsteady flow file to a plan file - set_num_cores(): Update the maximum number of cores to use - set_geom_preprocessor(): Update geometry preprocessor settings - clone_plan(): Create a new plan file based on a template - clone_unsteady(): Copy unsteady flow files from a template - clone_steady(): Copy steady flow files from a template - clone_geom(): Copy geometry files from a template - get_next_number(): Determine the next available number from a list - get_plan_value(): Retrieve a specific value from a plan file - get_results_path(): Get the results file path for a plan - get_plan_path(): Get the full path for a plan number - get_flow_path(): Get the full path for a flow number - get_unsteady_path(): Get the full path for an unsteady number - get_geom_path(): Get the full path for a geometry number - update_run_flags(): Update various run flags in a plan file - update_plan_intervals(): Update computation and output intervals - update_plan_description(): Update the description in a plan file - read_plan_description(): Read the description from a plan file - read_geom_description(): Read the description from a geometry file - update_geom_description(): Update the description in a geometry file - read_flow_description(): Read the description from a steady flow file - update_flow_description(): Update the description in a steady flow file - update_simulation_date(): Update simulation start and end dates - get_restart_output_settings(): Parse restart/Hot Start output settings - set_restart_output_settings(): Configure restart/Hot Start output settings - get_shortid(): Get the Short Identifier from a plan file - set_shortid(): Set the Short Identifier in a plan file - get_plan_title(): Get the Plan Title from a plan file - set_plan_title(): Set the Plan Title in a plan file - delete_plan(): Delete a plan and its associated files - renumber_plan(): Renumber a plan file and update references - delete_geom(): Delete a geometry file and its associated files - renumber_geom(): Renumber a geometry file and update references - delete_unsteady(): Delete an unsteady flow file - renumber_unsteady(): Renumber an unsteady flow file and update references - delete_steady(): Delete a steady flow file - renumber_steady(): Renumber a steady flow file and update references

RasFlowOptimization

RasFlowOptimization

RasFlowOptimization - Native HEC-RAS flow hydrograph optimization helpers.

This module configures HEC-RAS Automated Flow Optimization using the native Flow Ratio ... plan-file parameters and extracts trial summaries from HDF results or computation messages after execution.

All methods are static and are designed to be used without instantiation.

RasGeo

RasGeo

RasGeo - Operations for handling geometry files in HEC-RAS projects

DEPRECATION NOTICE

This class is deprecated and will be removed before v1.0. Please migrate to the new geometry subpackage classes: - GeomPreprocessor.clear_geompre_files() - replaces RasGeo.clear_geompre_files() - GeomLandCover.get_base_mannings_n() - replaces RasGeo.get_mannings_baseoverrides() - GeomLandCover.set_base_mannings_n() - replaces RasGeo.set_mannings_baseoverrides() - GeomLandCover.get_region_mannings_n() - replaces RasGeo.get_mannings_regionoverrides() - GeomLandCover.set_region_mannings_n() - replaces RasGeo.set_mannings_regionoverrides()

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasGeo: - clear_geompre_files(): Clears geometry preprocessor files for specified plan files [DEPRECATED] - get_mannings_baseoverrides(): Reads base Manning's n table from a geometry file [DEPRECATED] - get_mannings_regionoverrides(): Reads Manning's n region overrides from a geometry file [DEPRECATED] - set_mannings_baseoverrides(): Writes base Manning's n values to a geometry file [DEPRECATED] - set_mannings_regionoverrides(): Writes regional Manning's n overrides to a geometry file [DEPRECATED] - clone_geom(): Copy geometry files from a template with optional title and description

RasUnsteady

RasUnsteady

RasUnsteady - Operations for handling unsteady flow files in HEC-RAS projects.

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()).

Example

@log_call def my_function(): logger.debug("Additional debug information") # Function logic here


All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasUnsteady: - update_flow_title() - read_unsteady_description() - update_unsteady_description() - update_restart_settings() - set_restart_settings() - get_restart_settings() - set_hydrograph_fixed_start_time() - extract_boundary_and_tables() - print_boundaries_and_tables() - identify_tables() - parse_fixed_width_table() - extract_tables() - write_table_to_file() - get_met_precipitation_config() - set_precipitation_hyetograph() - set_constant_precipitation() - set_gridded_precipitation() - configure_gridded_dss_precipitation() - set_meteorological_station() - get_meteorological_stations() - set_point_evapotranspiration() - get_point_evapotranspiration()

Precipitation Functions: - get_met_precipitation_config() - Read Meteorological Data tab precipitation settings - set_precipitation_hyetograph() - Write hyetograph DataFrame to unsteady file - set_gridded_precipitation() - Configure GDAL raster precipitation - configure_gridded_dss_precipitation() - Configure gridded DSS precipitation

Meteorological Point Data Functions: - set_meteorological_station() - Create/update meteorological station metadata - get_meteorological_stations() - Parse meteorological station metadata - set_point_evapotranspiration() - Write point ET series from a DataFrame - get_point_evapotranspiration() - Parse point ET series into a DataFrame

DSS Boundary Condition Functions: - get_dss_boundaries() - Extract all DSS-linked BCs with full path info - get_inline_hydrograph_boundaries() - Extract inline table BCs with time series data - delete_boundary() - Remove one Boundary Location block from an unsteady file - update_dss_run_identifier() - Update DSS path F-part for new scenarios - set_boundary_dss_link() - Convert inline BC to DSS-linked (complete state transition) - set_boundary_inline_hydrograph() - Write inline hydrograph, convert DSS to inline - set_flow_hydrograph_slope() - Add or update Flow Hydrograph Slope= (EG slope) for a Flow Hydrograph BC - set_normal_depth_boundary() - Add or update Normal Depth (Friction Slope=) for a 1D river or 2D BC line boundary - get_unique_dss_subbasins() - Get unique HMS subbasin names from DSS paths - update_dss_path_by_station() - Update DSS A-part for specific river station - update_flow_multiplier_by_station() - Update/insert QMult for specific river station - update_boundary_dss_paths() - Batch update DSS paths and multipliers - get_rating_curve() - Read Rating Curve (stage, discharge) pairs from a boundary - set_rating_curve() - Write or replace Rating Curve data on a boundary

Stage/Flow Hydrograph Functions (Internal Boundary): - get_stage_flow_hydrograph() - Read observed stage/flow pairs from an internal BC - set_stage_flow_hydrograph() - Write observed stage/flow pairs to an internal BC

Lateral Inflow Hydrograph Functions: - get_lateral_inflow_hydrograph() - Read lateral inflow hydrograph data from a boundary - set_lateral_inflow_hydrograph() - Write lateral inflow hydrograph data to a boundary

Uniform Lateral Inflow Hydrograph Functions: - get_uniform_lateral_inflow_hydrograph() - Read uniform lateral inflow data (reach-based BC) - set_uniform_lateral_inflow_hydrograph() - Write uniform lateral inflow data (reach-based BC)

Initial Conditions Method Selection: - get_initial_flow_method() - Determine which IC method is active (restart_file, prior_ws, initial_flow_distribution, none) - set_initial_flow_method() - Set the IC method selection (restart_file, prior_ws, initial_flow_distribution, none) - get_prior_ws_filename() - Read Prior WS Filename and Profile from unsteady file - set_prior_ws_filename() - Write Prior WS Filename and Profile to unsteady file

Initial Flow Distribution Table: - get_initial_conditions() - Read all IC entries (flow, storage, rrr) as DataFrame - set_initial_conditions() - Write IC entries from list of dicts or DataFrame (auto-sets IC method) - validate_initial_flow_stations() - Check IC flow stations match geometry cross sections

Non-Newtonian Method Selection: - get_non_newtonian_method() - Read the Non-Newtonian method integer and return name - set_non_newtonian_method() - Set the Non-Newtonian method by integer or name

RasSteady

RasSteady

RasSteady - Read and author HEC-RAS steady flow files (.f##).

All methods are static and are designed to be used without instantiation.

Utilities

RasUtils

RasUtils

RasUtils - Utility functions for the ras-commander library

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()).

Example

@log_call def my_function(): logger.debug("Additional debug information") # Function logic here


All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasUtils: - create_directory() - safe_resolve() - find_files_by_extension() - get_file_size() - get_file_modification_time() - normalize_ras_number() - get_plan_path() - remove_with_retry() - update_plan_file() - check_file_access() - convert_to_dataframe() - save_to_excel() - calculate_rmse() - calculate_percent_bias() - calculate_error_metrics() - update_file() - get_next_number() - clone_file() - update_project_file() - remove_prj_entry() - rename_prj_entry() - decode_byte_strings() - perform_kdtree_query() - find_nearest_neighbors() - consolidate_dataframe() - find_nearest_value() - horizontal_distance() - find_valid_ras_folders() - is_valid_ras_folder() - safe_write_geometry() # Phase 2.1 - Atomic file write with backup - rollback_geometry() # Phase 2.1 - Restore from backup - validate_geometry_file_basic() # Phase 2.1 - Basic validation - backup_files() # Move files to timestamped Backup folder (safe deletion) - _read_description_block() # Internal - Read BEGIN DESCRIPTION / END DESCRIPTION block - _write_description_block() # Internal - Write BEGIN DESCRIPTION / END DESCRIPTION block

RasUtils

A class containing utility functions for the ras-commander library. When integrating new functions that do not clearly fit into other classes, add them here.

Source code in ras_commander/RasUtils.py
Python
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
class RasUtils:
    """
    A class containing utility functions for the ras-commander library.
    When integrating new functions that do not clearly fit into other classes, add them here.
    """

    @staticmethod
    @log_call
    def create_directory(directory_path: Path, ras_object=None) -> Path:
        """
        Ensure that a directory exists, creating it if necessary.

        Parameters:
        directory_path (Path): Path to the directory
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Returns:
        Path: Path to the ensured directory

        Example:
        >>> ensured_dir = RasUtils.create_directory(Path("output"))
        >>> print(f"Directory ensured: {ensured_dir}")
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        path = Path(directory_path)
        try:
            path.mkdir(parents=True, exist_ok=True)
            logger.debug(f"Directory ensured: {path}")
        except Exception as e:
            logger.error(f"Failed to create directory {path}: {e}")
            raise
        return path

    @staticmethod
    def safe_resolve(path: Path) -> Path:
        """
        Resolve path while preserving Windows drive letters.

        On Windows with mapped network drives, Path.resolve() converts
        drive letters (H:\\) to UNC paths (\\\\server\\share). HEC-RAS cannot
        read from UNC paths, so we preserve the drive letter format.

        This function:
        - On non-Windows: Uses standard resolve()
        - On Windows with local drives: Uses standard resolve()
        - On Windows with mapped drives: Falls back to absolute() to preserve drive letter

        Parameters:
            path (Path): Path to resolve

        Returns:
            Path: Resolved path with drive letter preserved if applicable

        Example:
            >>> from pathlib import Path
            >>> from ras_commander import RasUtils
            >>> # Local drive - normal resolution
            >>> resolved = RasUtils.safe_resolve(Path("C:/Projects/Model.prj"))
            >>> # Mapped drive - preserves H: instead of converting to UNC
            >>> resolved = RasUtils.safe_resolve(Path("H:/Projects/Model.prj"))
        """
        # Ensure we have a Path object
        path = Path(path)

        # On non-Windows, use standard resolve
        if os.name != 'nt':
            return path.resolve()

        original_str = str(path)
        resolved = path.resolve()

        # Check if original had drive letter but resolved became UNC path
        # Drive letter format: "X:..." where X is a letter
        # UNC format: "\\..." (starts with double backslash)
        has_drive_letter = len(original_str) >= 2 and original_str[1] == ':'
        is_unc = str(resolved).startswith('\\\\')

        if has_drive_letter and is_unc:
            # Mapped network drive detected - use absolute() to preserve drive letter
            logger.debug(
                f"Mapped drive detected: {original_str} would resolve to UNC {resolved}. "
                f"Using absolute() to preserve drive letter."
            )
            return path.absolute()

        return resolved

    # Windows reserved device names (case-insensitive, without extensions)
    _WINDOWS_RESERVED_NAMES = frozenset({
        'CON', 'PRN', 'AUX', 'NUL',
        'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
        'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9',
    })

    @staticmethod
    def ignore_windows_reserved(
        directory: str | Path,
        contents: list[str],
    ) -> set[str]:
        """
        Ignore function for shutil.copytree that skips Windows reserved device names.

        Windows lists virtual device names (NUL, CON, PRN, etc.) in directory
        listings even though they are not real files. shutil.copytree fails
        when it tries to copy them. This function filters them out.

        Parameters:
            directory: The directory being copied (provided by copytree)
            contents: List of names in the directory (provided by copytree)

        Returns:
            set: Names to ignore (Windows reserved device names)
        """
        ignored = set()
        for name in contents:
            stem = Path(name).stem.upper()
            if stem in RasUtils._WINDOWS_RESERVED_NAMES:
                logger.debug(f"Skipping Windows reserved name: {name} in {directory}")
                ignored.add(name)
        return ignored

    @staticmethod
    def is_windows_reserved_name(name: str) -> bool:
        """
        Check if a filename is a Windows reserved device name.

        Parameters:
            name: Filename to check

        Returns:
            bool: True if the name is a reserved device name
        """
        stem = Path(name).stem.upper()
        return stem in RasUtils._WINDOWS_RESERVED_NAMES

    @staticmethod
    @log_call
    def find_files_by_extension(extension: str, ras_object=None) -> list:
        """
        List all files in the project directory with a specific extension.

        Parameters:
        extension (str): File extension to filter (e.g., '.prj')
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Returns:
        list: List of file paths matching the extension

        Example:
        >>> prj_files = RasUtils.find_files_by_extension('.prj')
        >>> print(f"Found {len(prj_files)} .prj files")
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        try:
            files = list(ras_obj.project_folder.glob(f"*{extension}"))
            file_list = [str(file) for file in files]
            logger.info(f"Found {len(file_list)} files with extension '{extension}' in {ras_obj.project_folder}")
            return file_list
        except Exception as e:
            logger.error(f"Failed to find files with extension '{extension}': {e}")
            raise

    @staticmethod
    @log_call
    def get_file_size(file_path: Path, ras_object=None) -> Optional[int]:
        """
        Get the size of a file in bytes.

        Parameters:
        file_path (Path): Path to the file
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Returns:
        Optional[int]: Size of the file in bytes, or None if the file does not exist

        Example:
        >>> size = RasUtils.get_file_size(Path("project.prj"))
        >>> print(f"File size: {size} bytes")
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        path = Path(file_path)
        if path.exists():
            try:
                size = path.stat().st_size
                logger.debug(f"Size of {path}: {size} bytes")
                return size
            except Exception as e:
                logger.error(f"Failed to get size for {path}: {e}")
                raise
        else:
            logger.warning(f"File not found: {path}")
            return None

    @staticmethod
    @log_call
    def get_file_modification_time(file_path: Path, ras_object=None) -> Optional[float]:
        """
        Get the last modification time of a file.

        Parameters:
        file_path (Path): Path to the file
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Returns:
        Optional[float]: Last modification time as a timestamp, or None if the file does not exist

        Example:
        >>> mtime = RasUtils.get_file_modification_time(Path("project.prj"))
        >>> print(f"Last modified: {mtime}")
        """

        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        path = Path(file_path)
        if path.exists():
            try:
                mtime = path.stat().st_mtime
                logger.debug(f"Last modification time of {path}: {mtime}")
                return mtime
            except Exception as e:
                logger.exception(f"Failed to get modification time for {path}")
                raise
        else:
            logger.warning(f"File not found: {path}")
            return None

    @staticmethod
    @log_call
    def normalize_ras_number(ras_number: Union[str, int, float, Path, Number]) -> str:
        """
        Normalize RAS file numbers to two-digit string format.

        HEC-RAS uses two-digit file extensions for plans (.p01), geometries (.g02),
        flows (.f03), etc. This function standardizes various input formats to ensure
        consistent file path construction.

        Parameters:
        ras_number (Union[str, int, float, Path, Number]): Input number in various formats:
            - int: 1, 2, 3, etc.
            - str: "1", "01", "001", "p01", ".p01", "project.p01", etc.
            - float: 1.0, 2.0 (must be whole numbers)
            - Path: Path("project.p05") - extracts number from extension
            - Number: numpy.int64(1), etc.

        Returns:
        str: Normalized two-digit format ("01", "02", ..., "99")

        Raises:
        ValueError: If the number is not between 1 and 99, or cannot be converted
        TypeError: If the input type is invalid

        Examples:
        >>> RasUtils.normalize_ras_number(1)
        '01'
        >>> RasUtils.normalize_ras_number("1")
        '01'
        >>> RasUtils.normalize_ras_number("01")
        '01'
        >>> RasUtils.normalize_ras_number("001")
        '01'
        >>> RasUtils.normalize_ras_number("p01")
        '01'
        >>> RasUtils.normalize_ras_number(np.int64(5))
        '05'
        >>> RasUtils.normalize_ras_number(Path("project.p02"))
        '02'

        Notes:
        - Used for plan numbers, geometry numbers, flow file numbers, etc.
        - Ensures consistent handling across all RAS file types
        - Prevents file path construction errors from unnormalized inputs
        """
        # Handle Path objects - extract number from file extension
        if isinstance(ras_number, Path):
            # Extract from extensions like .p01, .g02, .f03, etc.
            suffix = ras_number.suffix  # e.g., ".p01"
            if len(suffix) >= 2 and suffix[0] == '.':
                # Try to extract number after the letter (e.g., "01" from ".p01")
                number_part = suffix[2:]  # Skip "." and letter
                if number_part.isdigit():
                    ras_number = number_part
                else:
                    raise ValueError(
                        f"Cannot extract RAS number from Path extension: {ras_number}. "
                        f"Expected format like 'project.p01' or 'geom.g02'"
                    )
            else:
                raise ValueError(
                    f"Cannot extract RAS number from Path: {ras_number}. "
                    f"Expected file with RAS extension like .p01, .g02, etc."
                )

        # Convert to integer for validation
        try:
            # Handle string inputs including bare prefixed forms ("p01") and
            # filename/path strings ("project.p01").
            if isinstance(ras_number, str):
                text = ras_number.strip()
                path_suffix = Path(text).suffix
                if (
                    len(path_suffix) >= 3
                    and path_suffix[0] == "."
                    and path_suffix[1].isalpha()
                    and path_suffix[2:].isdigit()
                ):
                    text = path_suffix[2:]
                elif (
                    len(text) >= 2
                    and text[0] == "."
                    and text[1].isalpha()
                    and text[2:].isdigit()
                ):
                    text = text[2:]
                elif len(text) >= 2 and text[0].isalpha() and text[1:].isdigit():
                    text = text[1:]

                stripped = text.lstrip('0')
                if not stripped or not stripped.isdigit():
                    # Handle edge cases like "0", "00", or non-numeric strings
                    if not stripped:  # Was all zeros
                        ras_int = 0
                    else:
                        raise ValueError(f"Cannot convert '{ras_number}' to integer")
                else:
                    ras_int = int(stripped)
            else:
                # Handle numeric types (int, float, numpy types, etc.)
                ras_int = int(ras_number)

                # Check if float had decimal component
                if isinstance(ras_number, (float, np.floating)) and ras_number != ras_int:
                    raise ValueError(
                        f"RAS numbers must be integers, got float with decimals: {ras_number}"
                    )

        except (ValueError, TypeError) as e:
            raise ValueError(
                f"Cannot convert RAS number '{ras_number}' (type: {type(ras_number).__name__}) "
                f"to integer: {e}"
            ) from e

        # Validate range (1-99 for HEC-RAS files)
        if not 1 <= ras_int <= 99:
            raise ValueError(
                f"RAS file number must be between 1 and 99, got: {ras_int}"
            )

        # Return normalized two-digit format
        normalized = f"{ras_int:02d}"
        logger.debug(f"Normalized RAS number '{ras_number}' to '{normalized}'")
        return normalized

    @staticmethod
    @log_call
    def get_plan_path(current_plan_number_or_path: Union[str, Number, Path], ras_object=None) -> Path:
        """
        Get the path for a plan file with a given plan number or path.

        Parameters:
        current_plan_number_or_path (Union[str, Number, Path]): The plan number (e.g., '01', 1, or 1.0) or full path to the plan file
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Returns:
        Path: Full path to the plan file

        Raises:
        ValueError: If plan number is not between 1 and 99
        TypeError: If input type is invalid
        FileNotFoundError: If the plan file does not exist

        Example:
        >>> plan_path = RasUtils.get_plan_path(1)
        >>> print(f"Plan file path: {plan_path}")
        >>> plan_path = RasUtils.get_plan_path("01")
        >>> print(f"Plan file path: {plan_path}")
        >>> plan_path = RasUtils.get_plan_path("path/to/plan.p01")
        >>> print(f"Plan file path: {plan_path}")
        """
        # Validate RAS object
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        # Handle direct file path input
        plan_path = Path(current_plan_number_or_path)
        if plan_path.is_file():
            logger.debug(f"Using provided plan file path: {plan_path}")
            return plan_path

        # Handle plan number input - use centralized normalization
        try:
            current_plan_number = RasUtils.normalize_ras_number(current_plan_number_or_path)
            logger.debug(f"Normalized plan number to: {current_plan_number}")
        except (ValueError, TypeError) as e:
            logger.error(f"Invalid plan number: {current_plan_number_or_path}. {e}")
            raise

        # Construct and validate plan path
        plan_name = f"{ras_obj.project_name}.p{current_plan_number}"
        full_plan_path = ras_obj.project_folder / plan_name

        if not full_plan_path.exists():
            logger.error(f"Plan file does not exist: {full_plan_path}")
            raise FileNotFoundError(f"Plan file does not exist: {full_plan_path}")

        logger.debug(f"Constructed plan file path: {full_plan_path}")
        return full_plan_path

    @staticmethod
    @log_call
    def remove_with_retry(
        path: Path,
        max_attempts: int = 5,
        initial_delay: float = 1.0,
        is_folder: bool = True,
        ras_object=None
    ) -> bool:
        """
        Attempts to remove a file or folder with retry logic and exponential backoff.

        Parameters:
        path (Path): Path to the file or folder to be removed.
        max_attempts (int): Maximum number of removal attempts.
        initial_delay (float): Initial delay between attempts in seconds.
        is_folder (bool): If True, the path is treated as a folder; if False, it's treated as a file.
        ras_object (RasPrj, optional): Accepted for backward compatibility. The
            cleanup does not require an initialized RAS project, so it can be used
            before project extraction or during worker-folder cleanup.

        Returns:
        bool: True if the file or folder was successfully removed, False otherwise.

        Example:
        >>> success = RasUtils.remove_with_retry(Path("temp_folder"), is_folder=True)
        >>> print(f"Removal successful: {success}")
        """
        path = Path(path)
        for attempt in range(1, max_attempts + 1):
            try:
                if path.exists():
                    if is_folder:
                        shutil.rmtree(path)
                        logger.debug(f"Folder removed: {path}")
                    else:
                        path.unlink()
                        logger.debug(f"File removed: {path}")
                else:
                    logger.debug(f"Path does not exist, nothing to remove: {path}")
                return True
            except PermissionError as pe:
                if attempt < max_attempts:
                    delay = initial_delay * (2 ** (attempt - 1))  # Exponential backoff
                    logger.warning(
                        f"PermissionError on attempt {attempt} to remove {path}: {pe}. "
                        f"Retrying in {delay} seconds..."
                    )
                    time.sleep(delay)
                else:
                    logger.error(
                        f"Failed to remove {path} after {max_attempts} attempts due to PermissionError: {pe}. Skipping."
                    )
                    return False
            except Exception as e:
                logger.exception(f"Failed to remove {path} on attempt {attempt}")
                return False
        return False

    @staticmethod
    @log_call
    def update_plan_file(
        plan_number_or_path: Union[str, Path],
        file_type: str,
        entry_number: int,
        ras_object=None
    ) -> None:
        """
        Update a plan file with a new file reference.

        Parameters:
        plan_number_or_path (Union[str, Path]): The plan number (1 to 99) or full path to the plan file
        file_type (str): Type of file to update ('Geom', 'Flow', or 'Unsteady')
        entry_number (int): Number (from 1 to 99) to set
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Raises:
        ValueError: If an invalid file_type is provided
        FileNotFoundError: If the plan file doesn't exist

        Example:
        >>> RasUtils.update_plan_file(1, "Geom", 2)
        >>> RasUtils.update_plan_file("path/to/plan.p01", "Geom", 2)
        """

        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        valid_file_types = {'Geom': 'g', 'Flow': 'f', 'Unsteady': 'u'}
        if file_type not in valid_file_types:
            logger.error(
                f"Invalid file_type '{file_type}'. Expected one of: {', '.join(valid_file_types.keys())}"
            )
            raise ValueError(
                f"Invalid file_type. Expected one of: {', '.join(valid_file_types.keys())}"
            )

        plan_file_path = Path(plan_number_or_path)
        if not plan_file_path.is_file():
            plan_file_path = RasUtils.get_plan_path(plan_number_or_path, ras_object)
            if not plan_file_path.exists():
                logger.error(f"Plan file not found: {plan_file_path}")
                raise FileNotFoundError(f"Plan file not found: {plan_file_path}")

        file_prefix = valid_file_types[file_type]
        search_pattern = f"{file_type} File="
        formatted_entry_number = f"{int(entry_number):02d}"  # Ensure two-digit format

        try:
            RasUtils.check_file_access(plan_file_path, 'r')
            with plan_file_path.open('r') as file:
                lines = file.readlines()
        except Exception as e:
            logger.exception(f"Failed to read plan file {plan_file_path}")
            raise

        updated = False
        for i, line in enumerate(lines):
            if line.startswith(search_pattern):
                lines[i] = f"{search_pattern}{file_prefix}{formatted_entry_number}\n"
                logger.info(
                    f"Updated {file_type} File in {plan_file_path} to {file_prefix}{formatted_entry_number}"
                )
                updated = True
                break

        if not updated:
            logger.warning(
                f"Search pattern '{search_pattern}' not found in {plan_file_path}. No update performed."
            )

        try:
            with plan_file_path.open('w') as file:
                file.writelines(lines)
            logger.info(f"Successfully updated plan file: {plan_file_path}")
        except Exception as e:
            logger.exception(f"Failed to write updates to plan file {plan_file_path}")
            raise

        # Refresh RasPrj dataframes
        try:
            ras_obj.plan_df = ras_obj.get_plan_entries()
            ras_obj.geom_df = ras_obj.get_geom_entries()
            ras_obj.flow_df = ras_obj.get_flow_entries()
            ras_obj.unsteady_df = ras_obj.get_unsteady_entries()
            logger.debug("RAS object dataframes have been refreshed.")
        except Exception as e:
            logger.exception("Failed to refresh RasPrj dataframes")
            raise

    @staticmethod
    @log_call
    def check_file_access(file_path: Path, mode: str = 'r') -> None:
        """
        Check if the file can be accessed with the specified mode.

        Parameters:
        file_path (Path): Path to the file
        mode (str): Mode to check ('r' for read, 'w' for write, etc.)

        Raises:
        FileNotFoundError: If the file does not exist
        PermissionError: If the required permissions are not met
        """

        path = Path(file_path)
        if not path.exists():
            logger.error(f"File not found: {file_path}")
            raise FileNotFoundError(f"File not found: {file_path}")

        if mode in ('r', 'rb'):
            if not os.access(path, os.R_OK):
                logger.error(f"Read permission denied for file: {file_path}")
                raise PermissionError(f"Read permission denied for file: {file_path}")
            else:
                logger.debug(f"Read access granted for file: {file_path}")

        if mode in ('w', 'wb', 'a', 'ab'):
            parent_dir = path.parent
            if not os.access(parent_dir, os.W_OK):
                logger.error(f"Write permission denied for directory: {parent_dir}")
                raise PermissionError(f"Write permission denied for directory: {parent_dir}")
            else:
                logger.debug(f"Write access granted for directory: {parent_dir}")


    @staticmethod
    @log_call
    def convert_to_dataframe(
        data_source: Union[pd.DataFrame, Path],
        **kwargs: Any
    ) -> pd.DataFrame:
        """
        Converts input to a pandas DataFrame. Supports existing DataFrames or file paths (CSV, Excel, TSV, Parquet).

        Args:
            data_source (Union[pd.DataFrame, Path]): The input to convert to a DataFrame. Can be a file path or an existing DataFrame.
            **kwargs: Additional keyword arguments to pass to pandas read functions.

        Returns:
            pd.DataFrame: The resulting DataFrame.

        Raises:
            NotImplementedError: If the file type is unsupported or input type is invalid.

        Example:
            >>> df = RasUtils.convert_to_dataframe(Path("data.csv"))
            >>> print(type(df))
            <class 'pandas.core.frame.DataFrame'>
        """
        if isinstance(data_source, pd.DataFrame):
            logger.debug("Input is already a DataFrame, returning a copy.")
            return data_source.copy()
        elif isinstance(data_source, Path):
            ext = data_source.suffix.replace('.', '', 1)
            logger.debug(f"Converting file with extension '{ext}' to DataFrame.")
            if ext == 'csv':
                return pd.read_csv(data_source, **kwargs)
            elif ext.startswith('x'):
                return pd.read_excel(data_source, **kwargs)
            elif ext == "tsv":
                return pd.read_csv(data_source, sep="\t", **kwargs)
            elif ext in ["parquet", "pq", "parq"]:
                return pd.read_parquet(data_source, **kwargs)
            else:
                logger.error(f"Unsupported file type: {ext}")
                raise NotImplementedError(f"Unsupported file type {ext}. Should be one of csv, tsv, parquet, or xlsx.")
        else:
            logger.error(f"Unsupported input type: {type(data_source)}")
            raise NotImplementedError(f"Unsupported type {type(data_source)}. Only file path / existing DataFrame supported at this time")

    @staticmethod
    @log_call
    def save_to_excel(
        dataframe: pd.DataFrame,
        excel_path: Path,
        **kwargs: Any
    ) -> None:
        """
        Saves a pandas DataFrame to an Excel file with retry functionality.

        Args:
            dataframe (pd.DataFrame): The DataFrame to save.
            excel_path (Path): The path to the Excel file where the DataFrame will be saved.
            **kwargs: Additional keyword arguments passed to `DataFrame.to_excel()`.

        Raises:
            IOError: If the file cannot be saved after multiple attempts.

        Example:
            >>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
            >>> RasUtils.save_to_excel(df, Path('output.xlsx'))
        """
        saved = False
        max_attempts = 3
        attempt = 0

        while not saved and attempt < max_attempts:
            try:
                dataframe.to_excel(excel_path, **kwargs)
                logger.info(f'DataFrame successfully saved to {excel_path}')
                saved = True
            except IOError as e:
                attempt += 1
                if attempt < max_attempts:
                    logger.warning(f"Error saving file. Attempt {attempt} of {max_attempts}. Please close the Excel document if it's open.")
                else:
                    logger.error(f"Failed to save {excel_path} after {max_attempts} attempts.")
                    raise IOError(f"Failed to save {excel_path} after {max_attempts} attempts. Last error: {str(e)}")

    @staticmethod
    @log_call
    def calculate_rmse(observed_values: np.ndarray, predicted_values: np.ndarray, normalized: bool = True) -> float:
        """
        Calculate the Root Mean Squared Error (RMSE) between observed and predicted values.

        Args:
            observed_values (np.ndarray): Actual observations time series.
            predicted_values (np.ndarray): Estimated/predicted time series.
            normalized (bool, optional): Whether to normalize RMSE to a percentage of observed_values. Defaults to True.

        Returns:
            float: The calculated RMSE value.

        Example:
            >>> observed = np.array([1, 2, 3])
            >>> predicted = np.array([1.1, 2.2, 2.9])
            >>> RasUtils.calculate_rmse(observed, predicted)
            0.06396394
        """
        rmse = np.sqrt(np.mean((predicted_values - observed_values) ** 2))

        if normalized:
            rmse = rmse / np.abs(np.mean(observed_values))

        logger.debug(f"Calculated RMSE: {rmse}")
        return rmse

    @staticmethod
    @log_call
    def calculate_percent_bias(observed_values: np.ndarray, predicted_values: np.ndarray, as_percentage: bool = False) -> float:
        """
        Calculate the Percent Bias between observed and predicted values.

        Args:
            observed_values (np.ndarray): Actual observations time series.
            predicted_values (np.ndarray): Estimated/predicted time series.
            as_percentage (bool, optional): If True, return bias as a percentage. Defaults to False.

        Returns:
            float: The calculated Percent Bias.

        Example:
            >>> observed = np.array([1, 2, 3])
            >>> predicted = np.array([1.1, 2.2, 2.9])
            >>> RasUtils.calculate_percent_bias(observed, predicted, as_percentage=True)
            3.33333333
        """
        multiplier = 100 if as_percentage else 1

        obs_mean = np.mean(observed_values)
        if obs_mean == 0:
            logger.warning("Percent bias undefined: mean of observed values is zero")
            return np.nan

        percent_bias = multiplier * (np.mean(predicted_values) - obs_mean) / obs_mean

        logger.debug(f"Calculated Percent Bias: {percent_bias}")
        return percent_bias

    @staticmethod
    @log_call
    def calculate_error_metrics(observed_values: np.ndarray, predicted_values: np.ndarray) -> Dict[str, float]:
        """
        Compute a trio of error metrics: correlation, RMSE, and Percent Bias.

        Args:
            observed_values (np.ndarray): Actual observations time series.
            predicted_values (np.ndarray): Estimated/predicted time series.

        Returns:
            Dict[str, float]: A dictionary containing correlation ('cor'), RMSE ('rmse'), and Percent Bias ('pb').

        Example:
            >>> observed = np.array([1, 2, 3])
            >>> predicted = np.array([1.1, 2.2, 2.9])
            >>> RasUtils.calculate_error_metrics(observed, predicted)
            {'cor': 0.9993, 'rmse': 0.06396, 'pb': 0.03333}
        """
        correlation = np.corrcoef(observed_values, predicted_values)[0, 1]
        rmse = RasUtils.calculate_rmse(observed_values, predicted_values)
        percent_bias = RasUtils.calculate_percent_bias(observed_values, predicted_values)

        metrics = {'cor': correlation, 'rmse': rmse, 'pb': percent_bias}
        logger.debug(f"Calculated error metrics: {metrics}")
        return metrics


    @staticmethod
    @log_call
    def update_file(file_path: Path, update_function: Callable, *args) -> None:
        """
        Generic method to update a file.

        Parameters:
        file_path (Path): Path to the file to be updated
        update_function (Callable): Function to update the file contents
        *args: Additional arguments to pass to the update_function

        Raises:
        Exception: If there's an error updating the file

        Example:
        >>> def update_content(lines, new_value):
        ...     lines[0] = f"New value: {new_value}\\n"
        ...     return lines
        >>> RasUtils.update_file(Path("example.txt"), update_content, "Hello")
        """
        try:
            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()

            updated_lines = update_function(lines, *args) if args else update_function(lines)

            with open(file_path, 'w', encoding='utf-8', errors='replace') as f:
                f.writelines(updated_lines)
            logger.info(f"Successfully updated file: {file_path}")
        except Exception as e:
            logger.exception(f"Failed to update file {file_path}")
            raise

    @staticmethod
    @log_call
    def get_next_number(existing_numbers: list) -> str:
        """
        Determine the next available number from a list of existing numbers.

        Parameters:
        existing_numbers (list): List of existing numbers as strings

        Returns:
        str: Next available number as a zero-padded string

        Example:
        >>> RasUtils.get_next_number(["01", "02", "04"])
        "05"
        """
        existing_numbers = sorted(int(num) for num in existing_numbers)
        next_number = max(existing_numbers, default=0) + 1
        return f"{next_number:02d}"

    @staticmethod
    @log_call
    def clone_file(template_path: Path, new_path: Path, update_function: Optional[Callable] = None, *args) -> None:
        """
        Generic method to clone a file and optionally update it.

        Parameters:
        template_path (Path): Path to the template file
        new_path (Path): Path where the new file will be created
        update_function (Optional[Callable]): Function to update the cloned file
        *args: Additional arguments to pass to the update_function

        Raises:
        FileNotFoundError: If the template file doesn't exist

        Example:
        >>> def update_content(lines, new_value):
        ...     lines[0] = f"New value: {new_value}\\n"
        ...     return lines
        >>> RasUtils.clone_file(Path("template.txt"), Path("new.txt"), update_content, "Hello")
        """
        if not template_path.exists():
            logger.error(f"Template file '{template_path}' does not exist.")
            raise FileNotFoundError(f"Template file '{template_path}' does not exist.")

        shutil.copy(template_path, new_path)
        logger.info(f"File cloned from {template_path} to {new_path}")

        if update_function:
            RasUtils.update_file(new_path, update_function, *args)
    @staticmethod
    @log_call
    def update_project_file(prj_file: Path, file_type: str, new_num: str, ras_object=None) -> None:
        """
        Update the project file with a new entry.

        Parameters:
        prj_file (Path): Path to the project file
        file_type (str): Type of file being added (e.g., 'Plan', 'Geom')
        new_num (str): Number of the new file entry
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Example:
        >>> RasUtils.update_project_file(Path("project.prj"), "Plan", "02")
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        try:
            with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()

            new_line = f"{file_type} File={file_type[0].lower()}{new_num}\n"
            lines.append(new_line)

            with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
                f.writelines(lines)
            logger.info(f"Project file updated with new {file_type} entry: {new_num}")
        except Exception as e:
            logger.exception(f"Failed to update project file {prj_file}")
            raise

    # NOTE: remove_prj_entry() and rename_prj_entry() are awaiting maintainer review
    @staticmethod
    @log_call
    def remove_prj_entry(prj_file: Path, file_type: str, number: str, ras_object=None) -> None:
        """
        Remove a file entry from the .prj file.

        Parameters:
        prj_file (Path): Path to the project file
        file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow')
        number (str): Two-digit number of the entry to remove (e.g., '05')
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Example:
        >>> RasUtils.remove_prj_entry(Path("project.prj"), "Plan", "05")
        # Removes the line "Plan File=p05" from the .prj file
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        prefix = file_type[0].lower()
        target = f"{file_type} File={prefix}{number}"

        try:
            with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()

            new_lines = [line for line in lines if line.strip() != target]

            if len(new_lines) == len(lines):
                logger.warning(f"Entry '{target}' not found in {prj_file}")
                return

            with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
                f.writelines(new_lines)
            logger.info(f"Removed {file_type} entry {number} from project file")
        except Exception as e:
            logger.exception(f"Failed to remove entry from project file {prj_file}")
            raise

    @staticmethod
    @log_call
    def rename_prj_entry(prj_file: Path, file_type: str, old_number: str, new_number: str, ras_object=None) -> None:
        """
        Rename a file entry in the .prj file.

        Parameters:
        prj_file (Path): Path to the project file
        file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow')
        old_number (str): Current two-digit number (e.g., '05')
        new_number (str): New two-digit number (e.g., '02')
        ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

        Example:
        >>> RasUtils.rename_prj_entry(Path("project.prj"), "Plan", "05", "02")
        # Changes "Plan File=p05" to "Plan File=p02" in the .prj file
        """
        ras_obj = ras_object or ras
        ras_obj.check_initialized()

        prefix = file_type[0].lower()
        old_line = f"{file_type} File={prefix}{old_number}"
        new_line_content = f"{file_type} File={prefix}{new_number}"

        try:
            with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()

            found = False
            for i, line in enumerate(lines):
                if line.strip() == old_line:
                    lines[i] = new_line_content + '\n'
                    found = True
                    break

            if not found:
                logger.warning(f"Entry '{old_line}' not found in {prj_file}")
                return

            with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
                f.writelines(lines)
            logger.info(f"Renamed {file_type} entry {old_number} to {new_number} in project file")
        except Exception as e:
            logger.exception(f"Failed to rename entry in project file {prj_file}")
            raise

    @staticmethod
    @log_call
    def backup_files(
        files: List[Union[Path, str]],
        project_folder: Union[Path, str],
        operation_label: str = "deleted",
    ) -> Optional[Path]:
        """
        Move files to a timestamped Backup folder inside the project.

        Creates {project_folder}/Backup/{YYYY-MM-DD_HHMMSS}_{operation_label}/
        and moves each existing file into that folder. Non-existent files are
        silently skipped.

        Parameters:
        files (List[Union[Path, str]]): File paths to back up (str or Path).
        project_folder (Union[Path, str]): Project root where Backup/ will be created.
        operation_label (str): Label appended to timestamp folder name (e.g., "deleted_p05").

        Returns:
        Optional[Path]: Path to backup folder if any files were moved, None otherwise.

        Example:
        >>> files = [Path("Muncie.p05"), Path("Muncie.p05.hdf")]
        >>> backup_dir = RasUtils.backup_files(files, project_folder, "deleted_p05")
        """
        files = [Path(f) for f in files]
        existing_files = [f for f in files if f.exists()]
        if not existing_files:
            return None

        timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
        backup_folder = Path(project_folder) / "Backup" / f"{timestamp}_{operation_label}"
        backup_folder.mkdir(parents=True, exist_ok=True)

        for f in existing_files:
            shutil.move(str(f), str(backup_folder / f.name))
            logger.info(f"Backed up {f.name} to {backup_folder}")

        return backup_folder

    # From FunkShuns

    @staticmethod
    @log_call
    def decode_byte_strings(dataframe: pd.DataFrame) -> pd.DataFrame:
        """
        Decodes byte strings in a DataFrame to regular string objects.

        This function converts columns with byte-encoded strings (e.g., b'string') into UTF-8 decoded strings.

        Args:
            dataframe (pd.DataFrame): The DataFrame containing byte-encoded string columns.

        Returns:
            pd.DataFrame: The DataFrame with byte strings decoded to regular strings.

        Example:
            >>> df = pd.DataFrame({'A': [b'hello', b'world'], 'B': [1, 2]})
            >>> decoded_df = RasUtils.decode_byte_strings(df)
            >>> print(decoded_df)
                A  B
            0  hello  1
            1  world  2
        """
        str_df = dataframe.select_dtypes(['object'])
        str_df = str_df.stack().str.decode('utf-8').unstack()
        for col in str_df:
            dataframe[col] = str_df[col]
        return dataframe

    @staticmethod
    @log_call
    def perform_kdtree_query(
        reference_points: np.ndarray,
        query_points: np.ndarray,
        max_distance: float = 2.0
    ) -> np.ndarray:
        """
        Performs a KDTree query between two datasets and returns indices with distances exceeding max_distance set to -1.

        Args:
            reference_points (np.ndarray): The reference dataset for KDTree.
            query_points (np.ndarray): The query dataset to search against KDTree of reference_points.
            max_distance (float, optional): The maximum distance threshold. Indices with distances greater than this are set to -1. Defaults to 2.0.

        Returns:
            np.ndarray: Array of indices from reference_points that are nearest to each point in query_points. 
                        Indices with distances > max_distance are set to -1.

        Example:
            >>> ref_points = np.array([[0, 0], [1, 1], [2, 2]])
            >>> query_points = np.array([[0.5, 0.5], [3, 3]])
            >>> result = RasUtils.perform_kdtree_query(ref_points, query_points)
            >>> print(result)
            array([ 0, -1])
        """
        dist, snap = KDTree(reference_points).query(query_points, distance_upper_bound=max_distance)
        snap[dist > max_distance] = -1
        return snap

    @staticmethod
    @log_call
    def find_nearest_neighbors(points: np.ndarray, max_distance: float = 2.0) -> np.ndarray:
        """
        Creates a self KDTree for dataset points and finds nearest neighbors excluding self, 
        with distances above max_distance set to -1.

        Args:
            points (np.ndarray): The dataset to build the KDTree from and query against itself.
            max_distance (float, optional): The maximum distance threshold. Indices with distances 
                                            greater than max_distance are set to -1. Defaults to 2.0.

        Returns:
            np.ndarray: Array of indices representing the nearest neighbor in points for each point in points. 
                        Indices with distances > max_distance or self-matches are set to -1.

        Example:
            >>> points = np.array([[0, 0], [1, 1], [2, 2], [10, 10]])
            >>> result = RasUtils.find_nearest_neighbors(points)
            >>> print(result)
            array([1, 0, 1, -1])
        """
        dist, snap = KDTree(points).query(points, k=2, distance_upper_bound=max_distance)
        snap[dist > max_distance] = -1

        snp = pd.DataFrame(snap, index=np.arange(len(snap)))
        snp = snp.replace(-1, np.nan)
        snp.loc[snp[0] == snp.index, 0] = np.nan
        snp.loc[snp[1] == snp.index, 1] = np.nan
        filled = snp[0].fillna(snp[1])
        snapped = filled.fillna(-1).astype(np.int64).to_numpy()
        return snapped

    @staticmethod
    @log_call
    def consolidate_dataframe(
        dataframe: pd.DataFrame,
        group_by: Optional[Union[str, List[str]]] = None,
        pivot_columns: Optional[Union[str, List[str]]] = None,
        level: Optional[int] = None,
        n_dimensional: bool = False,
        aggregation_method: Union[str, Callable] = 'list'
    ) -> pd.DataFrame:
        """
        Consolidate rows in a DataFrame by merging duplicate values into lists or using a specified aggregation function.

        Args:
            dataframe (pd.DataFrame): The DataFrame to consolidate.
            group_by (Optional[Union[str, List[str]]]): Columns or indices to group by.
            pivot_columns (Optional[Union[str, List[str]]]): Columns to pivot.
            level (Optional[int]): Level of multi-index to group by.
            n_dimensional (bool): If True, use a pivot table for N-Dimensional consolidation.
            aggregation_method (Union[str, Callable]): Aggregation method, e.g., 'list' to aggregate into lists.

        Returns:
            pd.DataFrame: The consolidated DataFrame.

        Example:
            >>> df = pd.DataFrame({'A': [1, 1, 2], 'B': [4, 5, 6], 'C': [7, 8, 9]})
            >>> result = RasUtils.consolidate_dataframe(df, group_by='A')
            >>> print(result)
            B         C
            A            
            1  [4, 5]  [7, 8]
            2  [6]     [9]
        """
        if aggregation_method == 'list':
            agg_func = lambda x: tuple(x)
        else:
            agg_func = aggregation_method

        if n_dimensional:
            result = dataframe.pivot_table(group_by, pivot_columns, aggfunc=agg_func)
        else:
            result = dataframe.groupby(group_by, level=level).agg(agg_func).map(list)

        return result

    @staticmethod
    @log_call
    def find_nearest_value(array: Union[list, np.ndarray], target_value: Union[int, float]) -> Union[int, float]:
        """
        Finds the nearest value in a NumPy array to the specified target value.

        Args:
            array (Union[list, np.ndarray]): The array to search within.
            target_value (Union[int, float]): The value to find the nearest neighbor to.

        Returns:
            Union[int, float]: The nearest value in the array to the specified target value.

        Example:
            >>> arr = np.array([1, 3, 5, 7, 9])
            >>> result = RasUtils.find_nearest_value(arr, 6)
            >>> print(result)
            5
        """
        array = np.asarray(array)
        idx = (np.abs(array - target_value)).argmin()
        return array[idx]

    @staticmethod
    @log_call
    def horizontal_distance(coord1: np.ndarray, coord2: np.ndarray) -> float:
        """
        Calculate the horizontal distance between two coordinate points.

        Args:
            coord1 (np.ndarray): First coordinate point [X, Y].
            coord2 (np.ndarray): Second coordinate point [X, Y].

        Returns:
            float: Horizontal distance.

        Example:
            >>> distance = RasUtils.horizontal_distance(np.array([0, 0]), np.array([3, 4]))
            >>> print(distance)
            5.0
        """
        return np.linalg.norm(coord2 - coord1)

    @staticmethod
    def find_valid_ras_folders(
        search_path: Union[str, Path],
        max_depth: Optional[int] = None,
        return_project_info: bool = False
    ) -> Union[List[Path], List[Dict[str, Any]]]:
        """
        Recursively search for valid HEC-RAS project folders.

        A valid HEC-RAS project folder contains:
        1. A .prj file with "Proj Title=" on the first line (HEC-RAS project file)
        2. At least one .pXX file where XX is 01-99 (plan files)

        This function does NOT require the global ras object to be initialized,
        making it suitable for discovery operations before project initialization.

        Args:
            search_path (Union[str, Path]): Root directory to search for HEC-RAS projects.
            max_depth (Optional[int]): Maximum folder depth to search. None means unlimited.
                Depth 0 = search_path only, 1 = immediate subdirectories, etc.
            return_project_info (bool): If True, return list of dicts with folder path,
                project name, prj file path, and plan count. If False, return list of Paths.

        Returns:
            Union[List[Path], List[Dict[str, Any]]]:
                - If return_project_info=False: List of Path objects for valid HEC-RAS folders
                - If return_project_info=True: List of dicts with keys:
                    - 'folder': Path to the project folder
                    - 'project_name': Name extracted from .prj filename
                    - 'prj_file': Path to the .prj file
                    - 'plan_count': Number of plan files found
                    - 'plan_numbers': List of plan numbers (e.g., ['01', '02', '15'])

        Example:
            >>> # Find all valid HEC-RAS project folders
            >>> folders = RasUtils.find_valid_ras_folders("C:/Projects/Hydrology")
            >>> for folder in folders:
            ...     print(f"Found project: {folder}")

            >>> # Get detailed info about each project
            >>> projects = RasUtils.find_valid_ras_folders(
            ...     "C:/Projects",
            ...     max_depth=3,
            ...     return_project_info=True
            ... )
            >>> for proj in projects:
            ...     print(f"{proj['project_name']}: {proj['plan_count']} plans")

        Note:
            This function distinguishes HEC-RAS .prj files from ESRI projection files
            by checking for "Proj Title=" on the first line of the file.
        """
        search_path = Path(search_path)
        if not search_path.exists():
            logger.warning(f"Search path does not exist: {search_path}")
            return []

        if not search_path.is_dir():
            logger.warning(f"Search path is not a directory: {search_path}")
            return []

        valid_folders = []

        def is_valid_ras_prj(prj_file: Path) -> bool:
            """Check if a .prj file is a valid HEC-RAS project file."""
            try:
                with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                    first_line = f.readline()
                    return first_line.strip().startswith("Proj Title=")
            except Exception as e:
                logger.debug(f"Could not read .prj file {prj_file}: {e}")
                return False

        def get_plan_files(folder: Path) -> List[Tuple[str, Path]]:
            """Get all valid plan files (.p01 to .p99) in a folder."""
            plan_files = []
            for i in range(1, 100):
                plan_num = f"{i:02d}"
                # Look for files matching *.pXX pattern
                for pfile in folder.glob(f"*.p{plan_num}"):
                    plan_files.append((plan_num, pfile))
            return plan_files

        def check_folder(folder: Path) -> Optional[Dict[str, Any]]:
            """Check if a folder is a valid HEC-RAS project folder."""
            # Find .prj files
            prj_files = list(folder.glob("*.prj"))

            if not prj_files:
                return None

            # Find valid HEC-RAS .prj file (not ESRI projection file)
            valid_prj = None
            for prj_file in prj_files:
                if is_valid_ras_prj(prj_file):
                    valid_prj = prj_file
                    break

            if valid_prj is None:
                return None

            # Check for plan files
            plan_files = get_plan_files(folder)
            if not plan_files:
                return None

            # This is a valid HEC-RAS project folder
            return {
                'folder': folder,
                'project_name': valid_prj.stem,
                'prj_file': valid_prj,
                'plan_count': len(plan_files),
                'plan_numbers': [pn for pn, _ in plan_files]
            }

        def scan_directory(current_path: Path, current_depth: int):
            """Recursively scan directories for HEC-RAS projects."""
            # Check if we've exceeded max depth
            if max_depth is not None and current_depth > max_depth:
                return

            # Check current folder
            result = check_folder(current_path)
            if result:
                valid_folders.append(result)
                # Don't search subdirectories of a valid project folder
                # (nested projects are uncommon and would cause confusion)
                return

            # Scan subdirectories
            try:
                for item in current_path.iterdir():
                    if item.is_dir() and not item.name.startswith('.'):
                        scan_directory(item, current_depth + 1)
            except PermissionError:
                logger.debug(f"Permission denied accessing: {current_path}")
            except Exception as e:
                logger.debug(f"Error scanning {current_path}: {e}")

        # Start scanning
        logger.info(f"Searching for HEC-RAS projects in: {search_path}")
        scan_directory(search_path, 0)
        logger.info(f"Found {len(valid_folders)} valid HEC-RAS project folders")

        if return_project_info:
            return valid_folders
        else:
            return [info['folder'] for info in valid_folders]

    @staticmethod
    def is_valid_ras_folder(folder_path: Union[str, Path]) -> bool:
        """
        Check if a single folder is a valid HEC-RAS project folder.

        A valid HEC-RAS project folder contains:
        1. A .prj file with "Proj Title=" on the first line
        2. At least one .pXX file where XX is 01-99

        This function does NOT require the global ras object to be initialized.

        Args:
            folder_path (Union[str, Path]): Path to the folder to check.

        Returns:
            bool: True if the folder is a valid HEC-RAS project folder.

        Example:
            >>> if RasUtils.is_valid_ras_folder("C:/Projects/MyRASModel"):
            ...     print("This is a valid HEC-RAS project folder")
            ... else:
            ...     print("Not a valid HEC-RAS project folder")
        """
        folder_path = Path(folder_path)
        if not folder_path.exists() or not folder_path.is_dir():
            return False

        # Find .prj files
        prj_files = list(folder_path.glob("*.prj"))
        if not prj_files:
            return False

        # Check if any .prj file is a valid HEC-RAS project file
        def is_valid_ras_prj(prj_file: Path) -> bool:
            try:
                with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                    first_line = f.readline()
                    return first_line.strip().startswith("Proj Title=")
            except Exception:
                return False

        has_valid_prj = any(is_valid_ras_prj(pf) for pf in prj_files)
        if not has_valid_prj:
            return False

        # Check for at least one plan file (.p01 to .p99)
        for i in range(1, 100):
            plan_num = f"{i:02d}"
            if list(folder_path.glob(f"*.p{plan_num}")):
                return True

        return False

    # =============================================================================
    # Atomic File Write Infrastructure (Phase 2.1 - HTAB Modification)
    # =============================================================================

    @staticmethod
    @log_call
    def safe_write_geometry(
        geom_file: Union[str, Path],
        modified_lines: List[str],
        create_backup: bool = True
    ) -> Optional[Path]:
        """
        Atomically write geometry file with backup for safe file modification.

        This function implements safe file modification for HEC-RAS geometry files,
        ensuring data integrity through atomic operations and optional backup creation.

        Process:
            1. Create timestamped backup: geom_file.YYYYMMDD_HHMMSS.bak
            2. Write to temp file: geom_file.tmp
            3. Basic validation (line count reasonable, file size reasonable)
            4. Atomic rename temp -> original (os.replace)
            5. Return backup path

        Parameters:
            geom_file (Union[str, Path]): Path to the geometry file to write.
            modified_lines (List[str]): List of lines to write to the file.
                Each line should include newline characters if needed.
            create_backup (bool): If True, create timestamped backup before modification.
                Defaults to True for safety.

        Returns:
            Optional[Path]: Path to backup file if create_backup=True and successful,
                None if create_backup=False or file didn't exist before.

        Raises:
            FileNotFoundError: If the geometry file doesn't exist (for modification).
            PermissionError: If write access is denied to the file or directory.
            ValueError: If modified_lines is empty or validation fails.
            IOError: If atomic rename fails.

        Example:
            >>> from ras_commander import RasUtils
            >>> from pathlib import Path
            >>>
            >>> # Read geometry file
            >>> geom_file = Path("project/geometry.g01")
            >>> with open(geom_file, 'r') as f:
            ...     lines = f.readlines()
            >>>
            >>> # Modify HTAB parameters (example)
            >>> modified_lines = modify_htab_params(lines, starting_el=580.0)
            >>>
            >>> # Safe write with backup
            >>> backup_path = RasUtils.safe_write_geometry(geom_file, modified_lines)
            >>> print(f"Backup created at: {backup_path}")

        Notes:
            - This function uses os.replace() for atomic rename, which is atomic on
              both Windows (NTFS) and Unix filesystems.
            - Backup files use format: filename.YYYYMMDD_HHMMSS.bak
            - If validation fails, temp file is deleted and original remains unchanged.
            - For rollback, use rollback_geometry() with the returned backup path.

        See Also:
            - rollback_geometry: Restore from backup after failed modification
            - .claude/rules/python/path-handling.md: Path handling patterns
        """
        geom_file = Path(geom_file)
        backup_path = None
        temp_path = None

        # Validate inputs
        if not modified_lines:
            raise ValueError("modified_lines cannot be empty")

        # Verify original file exists (we're modifying, not creating)
        if not geom_file.exists():
            raise FileNotFoundError(f"Geometry file not found: {geom_file}")

        # Check write permissions
        if not os.access(geom_file.parent, os.W_OK):
            raise PermissionError(f"Write permission denied for directory: {geom_file.parent}")

        try:
            # Read original file for validation comparison
            original_size = geom_file.stat().st_size
            with open(geom_file, 'r', encoding='utf-8', errors='replace') as f:
                original_line_count = sum(1 for _ in f)

            # Step 1: Create timestamped backup
            if create_backup:
                timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                backup_path = geom_file.parent / f"{geom_file.name}.{timestamp}.bak"

                # Copy original to backup
                shutil.copy2(geom_file, backup_path)
                logger.info(f"Backup created: {backup_path}")

            # Step 2: Write to temp file
            temp_path = geom_file.parent / f"{geom_file.name}.tmp"
            with open(temp_path, 'w', encoding='utf-8', newline='') as f:
                f.writelines(modified_lines)
            logger.debug(f"Temp file written: {temp_path}")

            # Step 3: Basic validation
            temp_size = temp_path.stat().st_size
            new_line_count = len(modified_lines)

            # Validation: File shouldn't be empty
            if temp_size == 0:
                raise ValueError("Modified file would be empty - validation failed")

            # Validation: Line count shouldn't change drastically (>50% reduction suspicious)
            if new_line_count < original_line_count * 0.5:
                raise ValueError(
                    f"Line count reduced drastically ({original_line_count} -> {new_line_count}). "
                    f"This may indicate data corruption. Aborting."
                )

            # Validation: File size shouldn't shrink too much (>80% reduction suspicious)
            if temp_size < original_size * 0.2:
                raise ValueError(
                    f"File size reduced drastically ({original_size} -> {temp_size} bytes). "
                    f"This may indicate data corruption. Aborting."
                )

            logger.debug(
                f"Validation passed: {new_line_count} lines, {temp_size} bytes "
                f"(original: {original_line_count} lines, {original_size} bytes)"
            )

            # Step 4: Atomic rename temp -> original
            # os.replace() is atomic on both Windows (NTFS) and Unix
            os.replace(temp_path, geom_file)
            temp_path = None  # Mark as successfully moved
            logger.info(f"Geometry file atomically updated: {geom_file}")

            return backup_path

        except Exception as e:
            # Clean up temp file if it exists
            if temp_path and temp_path.exists():
                try:
                    temp_path.unlink()
                    logger.debug(f"Cleaned up temp file: {temp_path}")
                except Exception as cleanup_error:
                    logger.warning(f"Failed to clean up temp file {temp_path}: {cleanup_error}")

            logger.error(f"Failed to write geometry file {geom_file}: {e}")
            raise

    @staticmethod
    @log_call
    def rollback_geometry(
        geom_file: Union[str, Path],
        backup_path: Union[str, Path]
    ) -> None:
        """
        Restore geometry file from backup after failed modification.

        This function restores a geometry file from a previously created backup,
        typically used when a modification operation fails or produces incorrect results.

        Process:
            1. Verify backup file exists
            2. Copy backup -> original (preserves backup for safety)
            3. Log restoration

        Parameters:
            geom_file (Union[str, Path]): Path to the geometry file to restore.
            backup_path (Union[str, Path]): Path to the backup file created by
                safe_write_geometry().

        Returns:
            None

        Raises:
            FileNotFoundError: If backup file doesn't exist.
            PermissionError: If write access is denied.
            IOError: If copy operation fails.

        Example:
            >>> from ras_commander import RasUtils
            >>> from pathlib import Path
            >>>
            >>> # Attempt modification
            >>> try:
            ...     backup = RasUtils.safe_write_geometry(geom_file, modified_lines)
            ...     # Run HEC-RAS to validate
            ...     RasCmdr.compute_plan("01", clear_geompre=True)
            ... except Exception as e:
            ...     # Modification failed - rollback
            ...     if backup:
            ...         RasUtils.rollback_geometry(geom_file, backup)
            ...         print("Geometry file restored from backup")
            ...     raise

        Notes:
            - This function copies the backup to original, preserving the backup.
            - Use shutil.copy2() to preserve file metadata (timestamps, permissions).
            - After successful rollback, you may want to delete the backup manually
              if no longer needed.

        See Also:
            - safe_write_geometry: Create backup and safely write modifications
        """
        geom_file = Path(geom_file)
        backup_path = Path(backup_path)

        # Verify backup exists
        if not backup_path.exists():
            raise FileNotFoundError(f"Backup file not found: {backup_path}")

        # Check write permissions
        if geom_file.exists() and not os.access(geom_file, os.W_OK):
            raise PermissionError(f"Write permission denied for file: {geom_file}")

        if not os.access(geom_file.parent, os.W_OK):
            raise PermissionError(f"Write permission denied for directory: {geom_file.parent}")

        try:
            # Copy backup to original (preserves backup for safety)
            shutil.copy2(backup_path, geom_file)
            logger.info(f"Geometry file restored from backup: {geom_file} <- {backup_path}")

        except Exception as e:
            logger.error(f"Failed to restore geometry file {geom_file} from {backup_path}: {e}")
            raise

    @staticmethod
    @log_call
    def validate_geometry_file_basic(
        geom_file: Union[str, Path],
        min_lines: int = 10,
        required_patterns: Optional[List[str]] = None
    ) -> bool:
        """
        Perform basic validation on a geometry file.

        This function checks that a geometry file meets basic structural requirements,
        useful for pre-modification validation or post-write verification.

        Parameters:
            geom_file (Union[str, Path]): Path to the geometry file to validate.
            min_lines (int): Minimum number of lines expected. Defaults to 10.
            required_patterns (Optional[List[str]]): List of strings that must appear
                somewhere in the file. Defaults to ["River Reach="] for HEC-RAS geometry.

        Returns:
            bool: True if validation passes, False otherwise.

        Example:
            >>> if RasUtils.validate_geometry_file_basic(geom_file):
            ...     print("Geometry file appears valid")
            >>>
            >>> # Custom validation
            >>> if RasUtils.validate_geometry_file_basic(
            ...     geom_file,
            ...     required_patterns=["River Reach=", "Type RM Length"]
            ... ):
            ...     print("Geometry file has cross sections")

        Notes:
            - This is a basic structural check, not a full HEC-RAS validation.
            - For comprehensive validation, use HEC-RAS geometric preprocessor.
        """
        geom_file = Path(geom_file)

        if required_patterns is None:
            # Default: Check for River Reach definition (present in most geometry files)
            required_patterns = ["River Reach="]

        if not geom_file.exists():
            logger.warning(f"Geometry file does not exist: {geom_file}")
            return False

        try:
            with open(geom_file, 'r', encoding='utf-8', errors='replace') as f:
                content = f.read()
                lines = content.splitlines()

            # Check minimum line count
            if len(lines) < min_lines:
                logger.warning(
                    f"Geometry file has too few lines: {len(lines)} < {min_lines}"
                )
                return False

            # Check required patterns
            for pattern in required_patterns:
                if pattern not in content:
                    logger.warning(f"Required pattern not found in geometry file: {pattern}")
                    return False

            logger.debug(f"Geometry file validation passed: {geom_file}")
            return True

        except Exception as e:
            logger.error(f"Error validating geometry file {geom_file}: {e}")
            return False

    @staticmethod
    def _read_description_block(file_path: Union[str, Path]) -> str:
        """
        Read the BEGIN DESCRIPTION / END DESCRIPTION block from any HEC-RAS text file.

        HEC-RAS uses the same description block format in plan (.p##), geometry (.g##),
        unsteady (.u##), and steady flow (.f##) files.

        Parameters:
            file_path (Union[str, Path]): Path to the HEC-RAS text file.

        Returns:
            str: The description text, or empty string if no description block found.
        """
        file_path = Path(file_path)
        if not file_path.exists():
            logger.warning(f"File not found for description read: {file_path}")
            return ""

        try:
            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()
        except IOError as e:
            logger.error(f"Error reading file {file_path}: {e}")
            return ""

        description_lines = []
        in_description = False
        for line in lines:
            stripped_upper = line.strip().upper()
            if stripped_upper in ('BEGIN DESCRIPTION:', 'BEGIN DESCRIPTION'):
                in_description = True
            elif stripped_upper in ('END DESCRIPTION:', 'END DESCRIPTION'):
                break
            elif in_description:
                description_lines.append(line.strip())

        return '\n'.join(description_lines)

    @staticmethod
    def _write_description_block(
        file_path: Union[str, Path],
        description: str,
        title_keyword: str
    ) -> bool:
        """
        Write a BEGIN DESCRIPTION / END DESCRIPTION block into any HEC-RAS text file.

        If an existing description block is found, it is replaced in place.
        If no description block exists, a new one is inserted after the title and
        Program Version lines.

        Parameters:
            file_path (Union[str, Path]): Path to the HEC-RAS text file.
            description (str): Description text to write.
            title_keyword (str): The title keyword for this file type, e.g.
                'Plan Title', 'Geom Title', 'Flow Title'. Used to determine
                insertion point when no existing description block is found.

        Returns:
            bool: True if successful, False otherwise.
        """
        file_path = Path(file_path)
        if not file_path.exists():
            logger.error(f"File not found for description write: {file_path}")
            return False

        try:
            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
                lines = f.readlines()
        except IOError as e:
            logger.error(f"Error reading file {file_path}: {e}")
            return False

        # Find existing description block
        desc_start_idx = None
        desc_end_idx = None
        for i, line in enumerate(lines):
            stripped_upper = line.strip().upper()
            if stripped_upper.startswith('BEGIN DESCRIPTION'):
                desc_start_idx = i
            elif stripped_upper.startswith('END DESCRIPTION'):
                desc_end_idx = i
                break

        # Prepare the new description block
        description_clean = description.rstrip()
        description_block = [
            'BEGIN DESCRIPTION:\n',
            description_clean + '\n',
            'END DESCRIPTION:\n'
        ]

        if desc_start_idx is not None and desc_end_idx is not None:
            # Replace existing description block in place
            new_lines = lines[:desc_start_idx] + description_block + lines[desc_end_idx + 1:]
        else:
            # Find insertion point: after title_keyword= and Program Version= lines
            last_header_idx = 0
            for i, line in enumerate(lines):
                stripped = line.strip()
                if stripped.startswith(f'{title_keyword}=') or stripped.startswith('Program Version='):
                    last_header_idx = max(last_header_idx, i)
            insertion_idx = last_header_idx + 1
            new_lines = lines[:insertion_idx] + description_block + lines[insertion_idx:]

        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                f.writelines(new_lines)
            logger.info(f"Updated description in {file_path}")
            return True
        except IOError as e:
            logger.error(f"Error writing description to {file_path}: {e}")
            return False

    @staticmethod
    @log_call
    def dos2unix(project_dir: Union[str, Path], extensions: Optional[List[str]] = None) -> int:
        """
        Convert CRLF line endings to LF in HEC-RAS text files.

        Processes .b## and .g## files by default (boundary and geometry text files
        that need LF endings for Linux HEC-RAS execution). Done in-place using
        pure Python (no shell dependency).

        Attribution: Implementation pattern derived from ras-agent
        (https://github.com/gheistand/ras-agent) by Glenn Heistand / CHAMP —
        Illinois State Water Survey. See runner.py:_dos2unix_dir().

        Parameters:
            project_dir (Union[str, Path]): Path to the HEC-RAS project directory.
            extensions (Optional[List[str]]): Custom regex patterns for file extensions
                to process. Defaults to [r'\\.(b|g)\\d+$'] which matches .b01, .g01, etc.

        Returns:
            int: Number of files modified.

        Example:
            >>> from ras_commander import RasUtils
            >>> count = RasUtils.dos2unix(Path("/project/dir"))
            >>> print(f"Converted {count} files")
        """
        import re

        project_dir = Path(project_dir)
        if not project_dir.is_dir():
            raise FileNotFoundError(f"Directory not found: {project_dir}")

        if extensions is None:
            patterns = [re.compile(r'\.(b|g)\d+$', re.IGNORECASE)]
        else:
            patterns = [re.compile(ext, re.IGNORECASE) for ext in extensions]

        modified_count = 0
        for fpath in project_dir.iterdir():
            if not fpath.is_file():
                continue
            if not any(p.search(fpath.name) for p in patterns):
                continue
            try:
                raw = fpath.read_bytes()
                if b'\r' in raw:
                    fpath.write_bytes(raw.replace(b'\r\n', b'\n').replace(b'\r', b'\n'))
                    modified_count += 1
                    logger.debug(f"dos2unix: {fpath.name}")
            except (OSError, PermissionError) as exc:
                logger.warning(f"dos2unix skipped {fpath.name}: {exc}")

        logger.info(f"dos2unix: converted {modified_count} files in {project_dir}")
        return modified_count

    @staticmethod
    def _scan_native_linux_ras(roots) -> Dict[str, Path]:
        """Scan native-Linux HEC-RAS install roots for RasUnsteady solver binaries.

        A native Linux install has no Ras.exe; the executable is the RasUnsteady
        solver (under ``bin_ras/`` for some 5.0.x layouts). Returns a mapping of
        ``{version-folder-name: Path(RasUnsteady)}``. Platform-agnostic so it is
        directly unit-testable (CLB-883).
        """
        found: Dict[str, Path] = {}
        for root in roots:
            root = Path(root)
            if not root.exists():
                continue
            for child in sorted(root.iterdir()):
                if not child.is_dir():
                    continue
                exe = None
                for binname in ("RasUnsteady", "rasUnsteady", "rasUnsteady64"):
                    for cand in (child / binname, child / "bin_ras" / binname):
                        if cand.is_file():
                            exe = cand
                            break
                    if exe is not None:
                        break
                if exe is not None:
                    found.setdefault(child.name, exe)
        return found

    @staticmethod
    @log_call
    def discover_ras_versions() -> Dict[str, Path]:
        """
        Discover installed HEC-RAS versions by scanning Windows Registry,
        filesystem, and Wine prefixes (on Linux).

        Resolution order:
        1. Windows Registry (HKLM, WOW6432Node, HKCU) -- Windows only
        2. Standard filesystem paths (Program Files) -- Windows only
        3. Native Linux installs (/opt/hecras/<ver>, /opt/HEC-RAS/<ver>,
           ~/hecras/<ver>, or $RAS_COMMANDER_LINUX_RAS_ROOT) -- Linux only
        4. Wine prefix paths (~/.wine, /opt/hecras-wine, etc.) -- Linux only

        Returns:
            Dict[str, Path]: Mapping of version string -> Path to the executable.
            On Windows/Wine this is ``Ras.exe``; for native Linux installs there
            is no Ras.exe, so it maps to the ``RasUnsteady`` solver binary (use
            ``.parent`` as ``ras_exe_dir`` for ``RasCmdr.compute_plan_linux``).
            Example: {"6.6": Path("C:/Program Files (x86)/HEC/HEC-RAS/6.6/Ras.exe")}
        """
        discovered: Dict[str, Path] = {}

        # Version folder names matching RasPrj.get_ras_exe()
        ras_version_folders = [
            "7.0", "6.7 Beta 5", "6.7 Beta 4", "6.6", "6.5", "6.4.1", "6.3.1", "6.3", "6.2",
            "6.1", "6.0", "5.0.7", "5.0.6", "5.0.5", "5.0.4", "5.0.3",
            "5.0.1", "5.0", "4.1.0", "4.0"
        ]

        version_aliases = {
            "4.1": "4.1.0", "41": "4.1.0", "410": "4.1.0",
            "40": "4.0", "50": "5.0", "501": "5.0.1", "503": "5.0.3",
            "504": "5.0.4", "505": "5.0.5", "506": "5.0.6", "507": "5.0.7",
            "60": "6.0", "61": "6.1", "62": "6.2", "63": "6.3",
            "631": "6.3.1", "6.4": "6.4.1", "64": "6.4.1", "641": "6.4.1",
            "65": "6.5", "66": "6.6", "6.7": "6.7 Beta 5", "67": "6.7 Beta 5",
            "70": "7.0",
        }

        def _normalize_version(raw: str, install_dir: Optional[Path] = None) -> str:
            v = str(raw).strip()
            if v in version_aliases:
                return version_aliases[v]
            if install_dir is not None:
                fn = install_dir.name.strip()
                if fn in version_aliases:
                    return version_aliases[fn]
                if fn in ras_version_folders:
                    return fn
            return v

        def _add(version: str, exe_path: Path, source: str) -> None:
            if version in discovered:
                logger.debug(f"Skipping duplicate HEC-RAS {version} from {source}")
                return
            discovered[version] = exe_path
            logger.info(f"Discovered HEC-RAS {version} at {exe_path} via {source}")

        def _scan_root(root_dir: Path, source_label: str) -> None:
            """Scan a directory containing versioned HEC-RAS subfolders."""
            if not root_dir.exists():
                return
            # Check known folder names first
            for folder_name in ras_version_folders:
                exe = root_dir / folder_name / "Ras.exe"
                if exe.is_file():
                    v = _normalize_version(folder_name, exe.parent)
                    _add(v, exe, source_label)
            # Glob for any other folders with Ras.exe
            try:
                for exe in sorted(root_dir.glob("*/Ras.exe")):
                    v = _normalize_version(exe.parent.name, exe.parent)
                    _add(v, exe, source_label)
            except OSError as exc:
                logger.warning(f"Filesystem scan failed for {root_dir}: {exc}")

        # --- Windows: Registry + Program Files ---
        if os.name == 'nt':
            # Registry scan
            try:
                import winreg

                def _is_no_more(exc: OSError) -> bool:
                    return getattr(exc, "winerror", None) == 259

                hive_map = {
                    "HKLM": winreg.HKEY_LOCAL_MACHINE,
                    "HKCU": winreg.HKEY_CURRENT_USER,
                }
                registry_locations = [
                    ("HKLM", r"SOFTWARE\HEC\HEC-RAS"),
                    ("HKLM", r"SOFTWARE\WOW6432Node\HEC\HEC-RAS"),
                    ("HKCU", r"SOFTWARE\HEC\HEC-RAS"),
                ]
                install_value_names = (
                    "InstallDir", "InstallPath", "Install Path",
                    "Path", "ExePath", "RasExePath",
                )

                for hive_name, subkey_path in registry_locations:
                    try:
                        with winreg.OpenKey(hive_map[hive_name], subkey_path) as root_key:
                            idx = 0
                            while True:
                                try:
                                    vk_name = winreg.EnumKey(root_key, idx)
                                except OSError as exc:
                                    if _is_no_more(exc):
                                        break
                                    break
                                idx += 1
                                try:
                                    with winreg.OpenKey(root_key, vk_name) as vk:
                                        install_val = None
                                        for val_name in install_value_names:
                                            try:
                                                val, _ = winreg.QueryValueEx(vk, val_name)
                                                if val:
                                                    install_val = str(val)
                                                    break
                                            except (FileNotFoundError, OSError):
                                                continue
                                        if install_val:
                                            p = Path(os.path.expandvars(install_val.strip().strip('"')))
                                            if p.suffix.lower() != '.exe':
                                                p = p / "Ras.exe"
                                            if p.name.lower() == "ras.exe" and p.is_file():
                                                v = _normalize_version(vk_name, p.parent)
                                                _add(v, p, f"registry {hive_name}\\{subkey_path}")
                                except (FileNotFoundError, OSError):
                                    continue
                    except (FileNotFoundError, OSError):
                        continue
            except ImportError:
                logger.debug("winreg not available, skipping registry scan")

            # Filesystem scan (standard Windows paths)
            _scan_root(Path("C:/Program Files (x86)/HEC/HEC-RAS"), "filesystem (x86)")
            _scan_root(Path("C:/Program Files/HEC/HEC-RAS"), "filesystem")

        # --- Linux: native install scan ---
        else:
            # Native Linux HEC-RAS installs have no Ras.exe; the RasUnsteady
            # solver binary is the executable. Maps version -> RasUnsteady path
            # (callers for compute_plan_linux use ``.parent`` as ras_exe_dir).
            # Roots are configurable via $RAS_COMMANDER_LINUX_RAS_ROOT (CLB-883).
            linux_native_roots = [
                Path(os.path.expanduser("~/hecras")),
                Path("/opt/hecras"),
                Path("/opt/HEC-RAS"),
            ]
            env_root = os.environ.get("RAS_COMMANDER_LINUX_RAS_ROOT")
            if env_root:
                linux_native_roots.insert(0, Path(env_root))
            for _folder, _exe in RasUtils._scan_native_linux_ras(linux_native_roots).items():
                _add(_normalize_version(_folder, _exe.parent), _exe, "linux native")

            # --- Linux: Wine prefix scan ---
            wine_prefix_candidates = [
                Path(os.path.expanduser("~/.wine")),
                Path("/opt/hecras-wine"),
                Path(os.path.expanduser("~/hecras-wine")),
            ]
            # Also check WINEPREFIX env var
            env_prefix = os.environ.get("WINEPREFIX")
            if env_prefix:
                wine_prefix_candidates.insert(0, Path(env_prefix))

            for prefix in wine_prefix_candidates:
                drive_c = prefix / "drive_c"
                if not drive_c.exists():
                    continue
                logger.debug(f"Scanning Wine prefix: {prefix}")
                # Standard HEC-RAS locations under drive_c
                _scan_root(drive_c / "Program Files (x86)" / "HEC" / "HEC-RAS", f"wine {prefix}")
                _scan_root(drive_c / "Program Files" / "HEC" / "HEC-RAS", f"wine {prefix}")
                _scan_root(drive_c / "HEC-RAS", f"wine {prefix}")

        logger.info(f"Discovered {len(discovered)} installed HEC-RAS version(s)")
        return discovered
create_directory staticmethod
Python
create_directory(directory_path: Path, ras_object=None) -> Path

Ensure that a directory exists, creating it if necessary.

Parameters: directory_path (Path): Path to the directory ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Returns: Path: Path to the ensured directory

Example:

ensured_dir = RasUtils.create_directory(Path("output")) print(f"Directory ensured: {ensured_dir}")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def create_directory(directory_path: Path, ras_object=None) -> Path:
    """
    Ensure that a directory exists, creating it if necessary.

    Parameters:
    directory_path (Path): Path to the directory
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Returns:
    Path: Path to the ensured directory

    Example:
    >>> ensured_dir = RasUtils.create_directory(Path("output"))
    >>> print(f"Directory ensured: {ensured_dir}")
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    path = Path(directory_path)
    try:
        path.mkdir(parents=True, exist_ok=True)
        logger.debug(f"Directory ensured: {path}")
    except Exception as e:
        logger.error(f"Failed to create directory {path}: {e}")
        raise
    return path
safe_resolve staticmethod
Python
safe_resolve(path: Path) -> Path

Resolve path while preserving Windows drive letters.

On Windows with mapped network drives, Path.resolve() converts drive letters (H:) to UNC paths (\server\share). HEC-RAS cannot read from UNC paths, so we preserve the drive letter format.

This function: - On non-Windows: Uses standard resolve() - On Windows with local drives: Uses standard resolve() - On Windows with mapped drives: Falls back to absolute() to preserve drive letter

Parameters:

Name Type Description Default
path Path

Path to resolve

required

Returns:

Name Type Description
Path Path

Resolved path with drive letter preserved if applicable

Example

from pathlib import Path from ras_commander import RasUtils

Local drive - normal resolution

resolved = RasUtils.safe_resolve(Path("C:/Projects/Model.prj"))

Mapped drive - preserves H: instead of converting to UNC

resolved = RasUtils.safe_resolve(Path("H:/Projects/Model.prj"))

Source code in ras_commander/RasUtils.py
Python
@staticmethod
def safe_resolve(path: Path) -> Path:
    """
    Resolve path while preserving Windows drive letters.

    On Windows with mapped network drives, Path.resolve() converts
    drive letters (H:\\) to UNC paths (\\\\server\\share). HEC-RAS cannot
    read from UNC paths, so we preserve the drive letter format.

    This function:
    - On non-Windows: Uses standard resolve()
    - On Windows with local drives: Uses standard resolve()
    - On Windows with mapped drives: Falls back to absolute() to preserve drive letter

    Parameters:
        path (Path): Path to resolve

    Returns:
        Path: Resolved path with drive letter preserved if applicable

    Example:
        >>> from pathlib import Path
        >>> from ras_commander import RasUtils
        >>> # Local drive - normal resolution
        >>> resolved = RasUtils.safe_resolve(Path("C:/Projects/Model.prj"))
        >>> # Mapped drive - preserves H: instead of converting to UNC
        >>> resolved = RasUtils.safe_resolve(Path("H:/Projects/Model.prj"))
    """
    # Ensure we have a Path object
    path = Path(path)

    # On non-Windows, use standard resolve
    if os.name != 'nt':
        return path.resolve()

    original_str = str(path)
    resolved = path.resolve()

    # Check if original had drive letter but resolved became UNC path
    # Drive letter format: "X:..." where X is a letter
    # UNC format: "\\..." (starts with double backslash)
    has_drive_letter = len(original_str) >= 2 and original_str[1] == ':'
    is_unc = str(resolved).startswith('\\\\')

    if has_drive_letter and is_unc:
        # Mapped network drive detected - use absolute() to preserve drive letter
        logger.debug(
            f"Mapped drive detected: {original_str} would resolve to UNC {resolved}. "
            f"Using absolute() to preserve drive letter."
        )
        return path.absolute()

    return resolved
ignore_windows_reserved staticmethod
Python
ignore_windows_reserved(directory: str | Path, contents: list[str]) -> set[str]

Ignore function for shutil.copytree that skips Windows reserved device names.

Windows lists virtual device names (NUL, CON, PRN, etc.) in directory listings even though they are not real files. shutil.copytree fails when it tries to copy them. This function filters them out.

Parameters:

Name Type Description Default
directory str | Path

The directory being copied (provided by copytree)

required
contents list[str]

List of names in the directory (provided by copytree)

required

Returns:

Name Type Description
set set[str]

Names to ignore (Windows reserved device names)

Source code in ras_commander/RasUtils.py
Python
@staticmethod
def ignore_windows_reserved(
    directory: str | Path,
    contents: list[str],
) -> set[str]:
    """
    Ignore function for shutil.copytree that skips Windows reserved device names.

    Windows lists virtual device names (NUL, CON, PRN, etc.) in directory
    listings even though they are not real files. shutil.copytree fails
    when it tries to copy them. This function filters them out.

    Parameters:
        directory: The directory being copied (provided by copytree)
        contents: List of names in the directory (provided by copytree)

    Returns:
        set: Names to ignore (Windows reserved device names)
    """
    ignored = set()
    for name in contents:
        stem = Path(name).stem.upper()
        if stem in RasUtils._WINDOWS_RESERVED_NAMES:
            logger.debug(f"Skipping Windows reserved name: {name} in {directory}")
            ignored.add(name)
    return ignored
is_windows_reserved_name staticmethod
Python
is_windows_reserved_name(name: str) -> bool

Check if a filename is a Windows reserved device name.

Parameters:

Name Type Description Default
name str

Filename to check

required

Returns:

Name Type Description
bool bool

True if the name is a reserved device name

Source code in ras_commander/RasUtils.py
Python
@staticmethod
def is_windows_reserved_name(name: str) -> bool:
    """
    Check if a filename is a Windows reserved device name.

    Parameters:
        name: Filename to check

    Returns:
        bool: True if the name is a reserved device name
    """
    stem = Path(name).stem.upper()
    return stem in RasUtils._WINDOWS_RESERVED_NAMES
find_files_by_extension staticmethod
Python
find_files_by_extension(extension: str, ras_object=None) -> list

List all files in the project directory with a specific extension.

Parameters: extension (str): File extension to filter (e.g., '.prj') ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Returns: list: List of file paths matching the extension

Example:

prj_files = RasUtils.find_files_by_extension('.prj') print(f"Found {len(prj_files)} .prj files")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def find_files_by_extension(extension: str, ras_object=None) -> list:
    """
    List all files in the project directory with a specific extension.

    Parameters:
    extension (str): File extension to filter (e.g., '.prj')
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Returns:
    list: List of file paths matching the extension

    Example:
    >>> prj_files = RasUtils.find_files_by_extension('.prj')
    >>> print(f"Found {len(prj_files)} .prj files")
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    try:
        files = list(ras_obj.project_folder.glob(f"*{extension}"))
        file_list = [str(file) for file in files]
        logger.info(f"Found {len(file_list)} files with extension '{extension}' in {ras_obj.project_folder}")
        return file_list
    except Exception as e:
        logger.error(f"Failed to find files with extension '{extension}': {e}")
        raise
get_file_size staticmethod
Python
get_file_size(file_path: Path, ras_object=None) -> Optional[int]

Get the size of a file in bytes.

Parameters: file_path (Path): Path to the file ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Returns: Optional[int]: Size of the file in bytes, or None if the file does not exist

Example:

size = RasUtils.get_file_size(Path("project.prj")) print(f"File size: {size} bytes")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def get_file_size(file_path: Path, ras_object=None) -> Optional[int]:
    """
    Get the size of a file in bytes.

    Parameters:
    file_path (Path): Path to the file
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Returns:
    Optional[int]: Size of the file in bytes, or None if the file does not exist

    Example:
    >>> size = RasUtils.get_file_size(Path("project.prj"))
    >>> print(f"File size: {size} bytes")
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    path = Path(file_path)
    if path.exists():
        try:
            size = path.stat().st_size
            logger.debug(f"Size of {path}: {size} bytes")
            return size
        except Exception as e:
            logger.error(f"Failed to get size for {path}: {e}")
            raise
    else:
        logger.warning(f"File not found: {path}")
        return None
get_file_modification_time staticmethod
Python
get_file_modification_time(file_path: Path, ras_object=None) -> Optional[float]

Get the last modification time of a file.

Parameters: file_path (Path): Path to the file ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Returns: Optional[float]: Last modification time as a timestamp, or None if the file does not exist

Example:

mtime = RasUtils.get_file_modification_time(Path("project.prj")) print(f"Last modified: {mtime}")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def get_file_modification_time(file_path: Path, ras_object=None) -> Optional[float]:
    """
    Get the last modification time of a file.

    Parameters:
    file_path (Path): Path to the file
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Returns:
    Optional[float]: Last modification time as a timestamp, or None if the file does not exist

    Example:
    >>> mtime = RasUtils.get_file_modification_time(Path("project.prj"))
    >>> print(f"Last modified: {mtime}")
    """

    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    path = Path(file_path)
    if path.exists():
        try:
            mtime = path.stat().st_mtime
            logger.debug(f"Last modification time of {path}: {mtime}")
            return mtime
        except Exception as e:
            logger.exception(f"Failed to get modification time for {path}")
            raise
    else:
        logger.warning(f"File not found: {path}")
        return None
normalize_ras_number staticmethod
Python
normalize_ras_number(ras_number: Union[str, int, float, Path, Number]) -> str

Normalize RAS file numbers to two-digit string format.

HEC-RAS uses two-digit file extensions for plans (.p01), geometries (.g02), flows (.f03), etc. This function standardizes various input formats to ensure consistent file path construction.

ras_number (Union[str, int, float, Path, Number]): Input number in various formats: - int: 1, 2, 3, etc. - str: "1", "01", "001", "p01", ".p01", "project.p01", etc. - float: 1.0, 2.0 (must be whole numbers) - Path: Path("project.p05") - extracts number from extension - Number: numpy.int64(1), etc.

Returns: str: Normalized two-digit format ("01", "02", ..., "99")

Raises: ValueError: If the number is not between 1 and 99, or cannot be converted TypeError: If the input type is invalid

Examples:

RasUtils.normalize_ras_number(1) '01' RasUtils.normalize_ras_number("1") '01' RasUtils.normalize_ras_number("01") '01' RasUtils.normalize_ras_number("001") '01' RasUtils.normalize_ras_number("p01") '01' RasUtils.normalize_ras_number(np.int64(5)) '05' RasUtils.normalize_ras_number(Path("project.p02")) '02'

Notes: - Used for plan numbers, geometry numbers, flow file numbers, etc. - Ensures consistent handling across all RAS file types - Prevents file path construction errors from unnormalized inputs

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def normalize_ras_number(ras_number: Union[str, int, float, Path, Number]) -> str:
    """
    Normalize RAS file numbers to two-digit string format.

    HEC-RAS uses two-digit file extensions for plans (.p01), geometries (.g02),
    flows (.f03), etc. This function standardizes various input formats to ensure
    consistent file path construction.

    Parameters:
    ras_number (Union[str, int, float, Path, Number]): Input number in various formats:
        - int: 1, 2, 3, etc.
        - str: "1", "01", "001", "p01", ".p01", "project.p01", etc.
        - float: 1.0, 2.0 (must be whole numbers)
        - Path: Path("project.p05") - extracts number from extension
        - Number: numpy.int64(1), etc.

    Returns:
    str: Normalized two-digit format ("01", "02", ..., "99")

    Raises:
    ValueError: If the number is not between 1 and 99, or cannot be converted
    TypeError: If the input type is invalid

    Examples:
    >>> RasUtils.normalize_ras_number(1)
    '01'
    >>> RasUtils.normalize_ras_number("1")
    '01'
    >>> RasUtils.normalize_ras_number("01")
    '01'
    >>> RasUtils.normalize_ras_number("001")
    '01'
    >>> RasUtils.normalize_ras_number("p01")
    '01'
    >>> RasUtils.normalize_ras_number(np.int64(5))
    '05'
    >>> RasUtils.normalize_ras_number(Path("project.p02"))
    '02'

    Notes:
    - Used for plan numbers, geometry numbers, flow file numbers, etc.
    - Ensures consistent handling across all RAS file types
    - Prevents file path construction errors from unnormalized inputs
    """
    # Handle Path objects - extract number from file extension
    if isinstance(ras_number, Path):
        # Extract from extensions like .p01, .g02, .f03, etc.
        suffix = ras_number.suffix  # e.g., ".p01"
        if len(suffix) >= 2 and suffix[0] == '.':
            # Try to extract number after the letter (e.g., "01" from ".p01")
            number_part = suffix[2:]  # Skip "." and letter
            if number_part.isdigit():
                ras_number = number_part
            else:
                raise ValueError(
                    f"Cannot extract RAS number from Path extension: {ras_number}. "
                    f"Expected format like 'project.p01' or 'geom.g02'"
                )
        else:
            raise ValueError(
                f"Cannot extract RAS number from Path: {ras_number}. "
                f"Expected file with RAS extension like .p01, .g02, etc."
            )

    # Convert to integer for validation
    try:
        # Handle string inputs including bare prefixed forms ("p01") and
        # filename/path strings ("project.p01").
        if isinstance(ras_number, str):
            text = ras_number.strip()
            path_suffix = Path(text).suffix
            if (
                len(path_suffix) >= 3
                and path_suffix[0] == "."
                and path_suffix[1].isalpha()
                and path_suffix[2:].isdigit()
            ):
                text = path_suffix[2:]
            elif (
                len(text) >= 2
                and text[0] == "."
                and text[1].isalpha()
                and text[2:].isdigit()
            ):
                text = text[2:]
            elif len(text) >= 2 and text[0].isalpha() and text[1:].isdigit():
                text = text[1:]

            stripped = text.lstrip('0')
            if not stripped or not stripped.isdigit():
                # Handle edge cases like "0", "00", or non-numeric strings
                if not stripped:  # Was all zeros
                    ras_int = 0
                else:
                    raise ValueError(f"Cannot convert '{ras_number}' to integer")
            else:
                ras_int = int(stripped)
        else:
            # Handle numeric types (int, float, numpy types, etc.)
            ras_int = int(ras_number)

            # Check if float had decimal component
            if isinstance(ras_number, (float, np.floating)) and ras_number != ras_int:
                raise ValueError(
                    f"RAS numbers must be integers, got float with decimals: {ras_number}"
                )

    except (ValueError, TypeError) as e:
        raise ValueError(
            f"Cannot convert RAS number '{ras_number}' (type: {type(ras_number).__name__}) "
            f"to integer: {e}"
        ) from e

    # Validate range (1-99 for HEC-RAS files)
    if not 1 <= ras_int <= 99:
        raise ValueError(
            f"RAS file number must be between 1 and 99, got: {ras_int}"
        )

    # Return normalized two-digit format
    normalized = f"{ras_int:02d}"
    logger.debug(f"Normalized RAS number '{ras_number}' to '{normalized}'")
    return normalized
get_plan_path staticmethod
Python
get_plan_path(current_plan_number_or_path: Union[str, Number, Path], ras_object=None) -> Path

Get the path for a plan file with a given plan number or path.

Parameters: current_plan_number_or_path (Union[str, Number, Path]): The plan number (e.g., '01', 1, or 1.0) or full path to the plan file ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Returns: Path: Full path to the plan file

Raises: ValueError: If plan number is not between 1 and 99 TypeError: If input type is invalid FileNotFoundError: If the plan file does not exist

Example:

plan_path = RasUtils.get_plan_path(1) print(f"Plan file path: {plan_path}") plan_path = RasUtils.get_plan_path("01") print(f"Plan file path: {plan_path}") plan_path = RasUtils.get_plan_path("path/to/plan.p01") print(f"Plan file path: {plan_path}")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def get_plan_path(current_plan_number_or_path: Union[str, Number, Path], ras_object=None) -> Path:
    """
    Get the path for a plan file with a given plan number or path.

    Parameters:
    current_plan_number_or_path (Union[str, Number, Path]): The plan number (e.g., '01', 1, or 1.0) or full path to the plan file
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Returns:
    Path: Full path to the plan file

    Raises:
    ValueError: If plan number is not between 1 and 99
    TypeError: If input type is invalid
    FileNotFoundError: If the plan file does not exist

    Example:
    >>> plan_path = RasUtils.get_plan_path(1)
    >>> print(f"Plan file path: {plan_path}")
    >>> plan_path = RasUtils.get_plan_path("01")
    >>> print(f"Plan file path: {plan_path}")
    >>> plan_path = RasUtils.get_plan_path("path/to/plan.p01")
    >>> print(f"Plan file path: {plan_path}")
    """
    # Validate RAS object
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    # Handle direct file path input
    plan_path = Path(current_plan_number_or_path)
    if plan_path.is_file():
        logger.debug(f"Using provided plan file path: {plan_path}")
        return plan_path

    # Handle plan number input - use centralized normalization
    try:
        current_plan_number = RasUtils.normalize_ras_number(current_plan_number_or_path)
        logger.debug(f"Normalized plan number to: {current_plan_number}")
    except (ValueError, TypeError) as e:
        logger.error(f"Invalid plan number: {current_plan_number_or_path}. {e}")
        raise

    # Construct and validate plan path
    plan_name = f"{ras_obj.project_name}.p{current_plan_number}"
    full_plan_path = ras_obj.project_folder / plan_name

    if not full_plan_path.exists():
        logger.error(f"Plan file does not exist: {full_plan_path}")
        raise FileNotFoundError(f"Plan file does not exist: {full_plan_path}")

    logger.debug(f"Constructed plan file path: {full_plan_path}")
    return full_plan_path
remove_with_retry staticmethod
Python
remove_with_retry(path: Path, max_attempts: int = 5, initial_delay: float = 1.0, is_folder: bool = True, ras_object=None) -> bool

Attempts to remove a file or folder with retry logic and exponential backoff.

Parameters: path (Path): Path to the file or folder to be removed. max_attempts (int): Maximum number of removal attempts. initial_delay (float): Initial delay between attempts in seconds. is_folder (bool): If True, the path is treated as a folder; if False, it's treated as a file. ras_object (RasPrj, optional): Accepted for backward compatibility. The cleanup does not require an initialized RAS project, so it can be used before project extraction or during worker-folder cleanup.

Returns: bool: True if the file or folder was successfully removed, False otherwise.

Example:

success = RasUtils.remove_with_retry(Path("temp_folder"), is_folder=True) print(f"Removal successful: {success}")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def remove_with_retry(
    path: Path,
    max_attempts: int = 5,
    initial_delay: float = 1.0,
    is_folder: bool = True,
    ras_object=None
) -> bool:
    """
    Attempts to remove a file or folder with retry logic and exponential backoff.

    Parameters:
    path (Path): Path to the file or folder to be removed.
    max_attempts (int): Maximum number of removal attempts.
    initial_delay (float): Initial delay between attempts in seconds.
    is_folder (bool): If True, the path is treated as a folder; if False, it's treated as a file.
    ras_object (RasPrj, optional): Accepted for backward compatibility. The
        cleanup does not require an initialized RAS project, so it can be used
        before project extraction or during worker-folder cleanup.

    Returns:
    bool: True if the file or folder was successfully removed, False otherwise.

    Example:
    >>> success = RasUtils.remove_with_retry(Path("temp_folder"), is_folder=True)
    >>> print(f"Removal successful: {success}")
    """
    path = Path(path)
    for attempt in range(1, max_attempts + 1):
        try:
            if path.exists():
                if is_folder:
                    shutil.rmtree(path)
                    logger.debug(f"Folder removed: {path}")
                else:
                    path.unlink()
                    logger.debug(f"File removed: {path}")
            else:
                logger.debug(f"Path does not exist, nothing to remove: {path}")
            return True
        except PermissionError as pe:
            if attempt < max_attempts:
                delay = initial_delay * (2 ** (attempt - 1))  # Exponential backoff
                logger.warning(
                    f"PermissionError on attempt {attempt} to remove {path}: {pe}. "
                    f"Retrying in {delay} seconds..."
                )
                time.sleep(delay)
            else:
                logger.error(
                    f"Failed to remove {path} after {max_attempts} attempts due to PermissionError: {pe}. Skipping."
                )
                return False
        except Exception as e:
            logger.exception(f"Failed to remove {path} on attempt {attempt}")
            return False
    return False
update_plan_file staticmethod
Python
update_plan_file(plan_number_or_path: Union[str, Path], file_type: str, entry_number: int, ras_object=None) -> None

Update a plan file with a new file reference.

Parameters: plan_number_or_path (Union[str, Path]): The plan number (1 to 99) or full path to the plan file file_type (str): Type of file to update ('Geom', 'Flow', or 'Unsteady') entry_number (int): Number (from 1 to 99) to set ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Raises: ValueError: If an invalid file_type is provided FileNotFoundError: If the plan file doesn't exist

Example:

RasUtils.update_plan_file(1, "Geom", 2) RasUtils.update_plan_file("path/to/plan.p01", "Geom", 2)

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def update_plan_file(
    plan_number_or_path: Union[str, Path],
    file_type: str,
    entry_number: int,
    ras_object=None
) -> None:
    """
    Update a plan file with a new file reference.

    Parameters:
    plan_number_or_path (Union[str, Path]): The plan number (1 to 99) or full path to the plan file
    file_type (str): Type of file to update ('Geom', 'Flow', or 'Unsteady')
    entry_number (int): Number (from 1 to 99) to set
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Raises:
    ValueError: If an invalid file_type is provided
    FileNotFoundError: If the plan file doesn't exist

    Example:
    >>> RasUtils.update_plan_file(1, "Geom", 2)
    >>> RasUtils.update_plan_file("path/to/plan.p01", "Geom", 2)
    """

    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    valid_file_types = {'Geom': 'g', 'Flow': 'f', 'Unsteady': 'u'}
    if file_type not in valid_file_types:
        logger.error(
            f"Invalid file_type '{file_type}'. Expected one of: {', '.join(valid_file_types.keys())}"
        )
        raise ValueError(
            f"Invalid file_type. Expected one of: {', '.join(valid_file_types.keys())}"
        )

    plan_file_path = Path(plan_number_or_path)
    if not plan_file_path.is_file():
        plan_file_path = RasUtils.get_plan_path(plan_number_or_path, ras_object)
        if not plan_file_path.exists():
            logger.error(f"Plan file not found: {plan_file_path}")
            raise FileNotFoundError(f"Plan file not found: {plan_file_path}")

    file_prefix = valid_file_types[file_type]
    search_pattern = f"{file_type} File="
    formatted_entry_number = f"{int(entry_number):02d}"  # Ensure two-digit format

    try:
        RasUtils.check_file_access(plan_file_path, 'r')
        with plan_file_path.open('r') as file:
            lines = file.readlines()
    except Exception as e:
        logger.exception(f"Failed to read plan file {plan_file_path}")
        raise

    updated = False
    for i, line in enumerate(lines):
        if line.startswith(search_pattern):
            lines[i] = f"{search_pattern}{file_prefix}{formatted_entry_number}\n"
            logger.info(
                f"Updated {file_type} File in {plan_file_path} to {file_prefix}{formatted_entry_number}"
            )
            updated = True
            break

    if not updated:
        logger.warning(
            f"Search pattern '{search_pattern}' not found in {plan_file_path}. No update performed."
        )

    try:
        with plan_file_path.open('w') as file:
            file.writelines(lines)
        logger.info(f"Successfully updated plan file: {plan_file_path}")
    except Exception as e:
        logger.exception(f"Failed to write updates to plan file {plan_file_path}")
        raise

    # Refresh RasPrj dataframes
    try:
        ras_obj.plan_df = ras_obj.get_plan_entries()
        ras_obj.geom_df = ras_obj.get_geom_entries()
        ras_obj.flow_df = ras_obj.get_flow_entries()
        ras_obj.unsteady_df = ras_obj.get_unsteady_entries()
        logger.debug("RAS object dataframes have been refreshed.")
    except Exception as e:
        logger.exception("Failed to refresh RasPrj dataframes")
        raise
check_file_access staticmethod
Python
check_file_access(file_path: Path, mode: str = 'r') -> None

Check if the file can be accessed with the specified mode.

Parameters: file_path (Path): Path to the file mode (str): Mode to check ('r' for read, 'w' for write, etc.)

Raises: FileNotFoundError: If the file does not exist PermissionError: If the required permissions are not met

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def check_file_access(file_path: Path, mode: str = 'r') -> None:
    """
    Check if the file can be accessed with the specified mode.

    Parameters:
    file_path (Path): Path to the file
    mode (str): Mode to check ('r' for read, 'w' for write, etc.)

    Raises:
    FileNotFoundError: If the file does not exist
    PermissionError: If the required permissions are not met
    """

    path = Path(file_path)
    if not path.exists():
        logger.error(f"File not found: {file_path}")
        raise FileNotFoundError(f"File not found: {file_path}")

    if mode in ('r', 'rb'):
        if not os.access(path, os.R_OK):
            logger.error(f"Read permission denied for file: {file_path}")
            raise PermissionError(f"Read permission denied for file: {file_path}")
        else:
            logger.debug(f"Read access granted for file: {file_path}")

    if mode in ('w', 'wb', 'a', 'ab'):
        parent_dir = path.parent
        if not os.access(parent_dir, os.W_OK):
            logger.error(f"Write permission denied for directory: {parent_dir}")
            raise PermissionError(f"Write permission denied for directory: {parent_dir}")
        else:
            logger.debug(f"Write access granted for directory: {parent_dir}")
convert_to_dataframe staticmethod
Python
convert_to_dataframe(data_source: Union[DataFrame, Path], **kwargs: Any) -> pd.DataFrame

Converts input to a pandas DataFrame. Supports existing DataFrames or file paths (CSV, Excel, TSV, Parquet).

Parameters:

Name Type Description Default
data_source Union[DataFrame, Path]

The input to convert to a DataFrame. Can be a file path or an existing DataFrame.

required
**kwargs Any

Additional keyword arguments to pass to pandas read functions.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The resulting DataFrame.

Raises:

Type Description
NotImplementedError

If the file type is unsupported or input type is invalid.

Example

df = RasUtils.convert_to_dataframe(Path("data.csv")) print(type(df))

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def convert_to_dataframe(
    data_source: Union[pd.DataFrame, Path],
    **kwargs: Any
) -> pd.DataFrame:
    """
    Converts input to a pandas DataFrame. Supports existing DataFrames or file paths (CSV, Excel, TSV, Parquet).

    Args:
        data_source (Union[pd.DataFrame, Path]): The input to convert to a DataFrame. Can be a file path or an existing DataFrame.
        **kwargs: Additional keyword arguments to pass to pandas read functions.

    Returns:
        pd.DataFrame: The resulting DataFrame.

    Raises:
        NotImplementedError: If the file type is unsupported or input type is invalid.

    Example:
        >>> df = RasUtils.convert_to_dataframe(Path("data.csv"))
        >>> print(type(df))
        <class 'pandas.core.frame.DataFrame'>
    """
    if isinstance(data_source, pd.DataFrame):
        logger.debug("Input is already a DataFrame, returning a copy.")
        return data_source.copy()
    elif isinstance(data_source, Path):
        ext = data_source.suffix.replace('.', '', 1)
        logger.debug(f"Converting file with extension '{ext}' to DataFrame.")
        if ext == 'csv':
            return pd.read_csv(data_source, **kwargs)
        elif ext.startswith('x'):
            return pd.read_excel(data_source, **kwargs)
        elif ext == "tsv":
            return pd.read_csv(data_source, sep="\t", **kwargs)
        elif ext in ["parquet", "pq", "parq"]:
            return pd.read_parquet(data_source, **kwargs)
        else:
            logger.error(f"Unsupported file type: {ext}")
            raise NotImplementedError(f"Unsupported file type {ext}. Should be one of csv, tsv, parquet, or xlsx.")
    else:
        logger.error(f"Unsupported input type: {type(data_source)}")
        raise NotImplementedError(f"Unsupported type {type(data_source)}. Only file path / existing DataFrame supported at this time")
save_to_excel staticmethod
Python
save_to_excel(dataframe: DataFrame, excel_path: Path, **kwargs: Any) -> None

Saves a pandas DataFrame to an Excel file with retry functionality.

Parameters:

Name Type Description Default
dataframe DataFrame

The DataFrame to save.

required
excel_path Path

The path to the Excel file where the DataFrame will be saved.

required
**kwargs Any

Additional keyword arguments passed to DataFrame.to_excel().

{}

Raises:

Type Description
IOError

If the file cannot be saved after multiple attempts.

Example

df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) RasUtils.save_to_excel(df, Path('output.xlsx'))

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def save_to_excel(
    dataframe: pd.DataFrame,
    excel_path: Path,
    **kwargs: Any
) -> None:
    """
    Saves a pandas DataFrame to an Excel file with retry functionality.

    Args:
        dataframe (pd.DataFrame): The DataFrame to save.
        excel_path (Path): The path to the Excel file where the DataFrame will be saved.
        **kwargs: Additional keyword arguments passed to `DataFrame.to_excel()`.

    Raises:
        IOError: If the file cannot be saved after multiple attempts.

    Example:
        >>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
        >>> RasUtils.save_to_excel(df, Path('output.xlsx'))
    """
    saved = False
    max_attempts = 3
    attempt = 0

    while not saved and attempt < max_attempts:
        try:
            dataframe.to_excel(excel_path, **kwargs)
            logger.info(f'DataFrame successfully saved to {excel_path}')
            saved = True
        except IOError as e:
            attempt += 1
            if attempt < max_attempts:
                logger.warning(f"Error saving file. Attempt {attempt} of {max_attempts}. Please close the Excel document if it's open.")
            else:
                logger.error(f"Failed to save {excel_path} after {max_attempts} attempts.")
                raise IOError(f"Failed to save {excel_path} after {max_attempts} attempts. Last error: {str(e)}")
calculate_rmse staticmethod
Python
calculate_rmse(observed_values: ndarray, predicted_values: ndarray, normalized: bool = True) -> float

Calculate the Root Mean Squared Error (RMSE) between observed and predicted values.

Parameters:

Name Type Description Default
observed_values ndarray

Actual observations time series.

required
predicted_values ndarray

Estimated/predicted time series.

required
normalized bool

Whether to normalize RMSE to a percentage of observed_values. Defaults to True.

True

Returns:

Name Type Description
float float

The calculated RMSE value.

Example

observed = np.array([1, 2, 3]) predicted = np.array([1.1, 2.2, 2.9]) RasUtils.calculate_rmse(observed, predicted) 0.06396394

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def calculate_rmse(observed_values: np.ndarray, predicted_values: np.ndarray, normalized: bool = True) -> float:
    """
    Calculate the Root Mean Squared Error (RMSE) between observed and predicted values.

    Args:
        observed_values (np.ndarray): Actual observations time series.
        predicted_values (np.ndarray): Estimated/predicted time series.
        normalized (bool, optional): Whether to normalize RMSE to a percentage of observed_values. Defaults to True.

    Returns:
        float: The calculated RMSE value.

    Example:
        >>> observed = np.array([1, 2, 3])
        >>> predicted = np.array([1.1, 2.2, 2.9])
        >>> RasUtils.calculate_rmse(observed, predicted)
        0.06396394
    """
    rmse = np.sqrt(np.mean((predicted_values - observed_values) ** 2))

    if normalized:
        rmse = rmse / np.abs(np.mean(observed_values))

    logger.debug(f"Calculated RMSE: {rmse}")
    return rmse
calculate_percent_bias staticmethod
Python
calculate_percent_bias(observed_values: ndarray, predicted_values: ndarray, as_percentage: bool = False) -> float

Calculate the Percent Bias between observed and predicted values.

Parameters:

Name Type Description Default
observed_values ndarray

Actual observations time series.

required
predicted_values ndarray

Estimated/predicted time series.

required
as_percentage bool

If True, return bias as a percentage. Defaults to False.

False

Returns:

Name Type Description
float float

The calculated Percent Bias.

Example

observed = np.array([1, 2, 3]) predicted = np.array([1.1, 2.2, 2.9]) RasUtils.calculate_percent_bias(observed, predicted, as_percentage=True) 3.33333333

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def calculate_percent_bias(observed_values: np.ndarray, predicted_values: np.ndarray, as_percentage: bool = False) -> float:
    """
    Calculate the Percent Bias between observed and predicted values.

    Args:
        observed_values (np.ndarray): Actual observations time series.
        predicted_values (np.ndarray): Estimated/predicted time series.
        as_percentage (bool, optional): If True, return bias as a percentage. Defaults to False.

    Returns:
        float: The calculated Percent Bias.

    Example:
        >>> observed = np.array([1, 2, 3])
        >>> predicted = np.array([1.1, 2.2, 2.9])
        >>> RasUtils.calculate_percent_bias(observed, predicted, as_percentage=True)
        3.33333333
    """
    multiplier = 100 if as_percentage else 1

    obs_mean = np.mean(observed_values)
    if obs_mean == 0:
        logger.warning("Percent bias undefined: mean of observed values is zero")
        return np.nan

    percent_bias = multiplier * (np.mean(predicted_values) - obs_mean) / obs_mean

    logger.debug(f"Calculated Percent Bias: {percent_bias}")
    return percent_bias
calculate_error_metrics staticmethod
Python
calculate_error_metrics(observed_values: ndarray, predicted_values: ndarray) -> Dict[str, float]

Compute a trio of error metrics: correlation, RMSE, and Percent Bias.

Parameters:

Name Type Description Default
observed_values ndarray

Actual observations time series.

required
predicted_values ndarray

Estimated/predicted time series.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: A dictionary containing correlation ('cor'), RMSE ('rmse'), and Percent Bias ('pb').

Example

observed = np.array([1, 2, 3]) predicted = np.array([1.1, 2.2, 2.9]) RasUtils.calculate_error_metrics(observed, predicted)

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def calculate_error_metrics(observed_values: np.ndarray, predicted_values: np.ndarray) -> Dict[str, float]:
    """
    Compute a trio of error metrics: correlation, RMSE, and Percent Bias.

    Args:
        observed_values (np.ndarray): Actual observations time series.
        predicted_values (np.ndarray): Estimated/predicted time series.

    Returns:
        Dict[str, float]: A dictionary containing correlation ('cor'), RMSE ('rmse'), and Percent Bias ('pb').

    Example:
        >>> observed = np.array([1, 2, 3])
        >>> predicted = np.array([1.1, 2.2, 2.9])
        >>> RasUtils.calculate_error_metrics(observed, predicted)
        {'cor': 0.9993, 'rmse': 0.06396, 'pb': 0.03333}
    """
    correlation = np.corrcoef(observed_values, predicted_values)[0, 1]
    rmse = RasUtils.calculate_rmse(observed_values, predicted_values)
    percent_bias = RasUtils.calculate_percent_bias(observed_values, predicted_values)

    metrics = {'cor': correlation, 'rmse': rmse, 'pb': percent_bias}
    logger.debug(f"Calculated error metrics: {metrics}")
    return metrics
update_file staticmethod
Python
update_file(file_path: Path, update_function: Callable, *args) -> None

Generic method to update a file.

Parameters: file_path (Path): Path to the file to be updated update_function (Callable): Function to update the file contents *args: Additional arguments to pass to the update_function

Raises: Exception: If there's an error updating the file

Example:

def update_content(lines, new_value): ... lines[0] = f"New value: {new_value}\n" ... return lines RasUtils.update_file(Path("example.txt"), update_content, "Hello")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def update_file(file_path: Path, update_function: Callable, *args) -> None:
    """
    Generic method to update a file.

    Parameters:
    file_path (Path): Path to the file to be updated
    update_function (Callable): Function to update the file contents
    *args: Additional arguments to pass to the update_function

    Raises:
    Exception: If there's an error updating the file

    Example:
    >>> def update_content(lines, new_value):
    ...     lines[0] = f"New value: {new_value}\\n"
    ...     return lines
    >>> RasUtils.update_file(Path("example.txt"), update_content, "Hello")
    """
    try:
        with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
            lines = f.readlines()

        updated_lines = update_function(lines, *args) if args else update_function(lines)

        with open(file_path, 'w', encoding='utf-8', errors='replace') as f:
            f.writelines(updated_lines)
        logger.info(f"Successfully updated file: {file_path}")
    except Exception as e:
        logger.exception(f"Failed to update file {file_path}")
        raise
get_next_number staticmethod
Python
get_next_number(existing_numbers: list) -> str

Determine the next available number from a list of existing numbers.

Parameters: existing_numbers (list): List of existing numbers as strings

Returns: str: Next available number as a zero-padded string

Example:

RasUtils.get_next_number(["01", "02", "04"]) "05"

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def get_next_number(existing_numbers: list) -> str:
    """
    Determine the next available number from a list of existing numbers.

    Parameters:
    existing_numbers (list): List of existing numbers as strings

    Returns:
    str: Next available number as a zero-padded string

    Example:
    >>> RasUtils.get_next_number(["01", "02", "04"])
    "05"
    """
    existing_numbers = sorted(int(num) for num in existing_numbers)
    next_number = max(existing_numbers, default=0) + 1
    return f"{next_number:02d}"
clone_file staticmethod
Python
clone_file(template_path: Path, new_path: Path, update_function: Optional[Callable] = None, *args) -> None

Generic method to clone a file and optionally update it.

Parameters: template_path (Path): Path to the template file new_path (Path): Path where the new file will be created update_function (Optional[Callable]): Function to update the cloned file *args: Additional arguments to pass to the update_function

Raises: FileNotFoundError: If the template file doesn't exist

Example:

def update_content(lines, new_value): ... lines[0] = f"New value: {new_value}\n" ... return lines RasUtils.clone_file(Path("template.txt"), Path("new.txt"), update_content, "Hello")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def clone_file(template_path: Path, new_path: Path, update_function: Optional[Callable] = None, *args) -> None:
    """
    Generic method to clone a file and optionally update it.

    Parameters:
    template_path (Path): Path to the template file
    new_path (Path): Path where the new file will be created
    update_function (Optional[Callable]): Function to update the cloned file
    *args: Additional arguments to pass to the update_function

    Raises:
    FileNotFoundError: If the template file doesn't exist

    Example:
    >>> def update_content(lines, new_value):
    ...     lines[0] = f"New value: {new_value}\\n"
    ...     return lines
    >>> RasUtils.clone_file(Path("template.txt"), Path("new.txt"), update_content, "Hello")
    """
    if not template_path.exists():
        logger.error(f"Template file '{template_path}' does not exist.")
        raise FileNotFoundError(f"Template file '{template_path}' does not exist.")

    shutil.copy(template_path, new_path)
    logger.info(f"File cloned from {template_path} to {new_path}")

    if update_function:
        RasUtils.update_file(new_path, update_function, *args)
update_project_file staticmethod
Python
update_project_file(prj_file: Path, file_type: str, new_num: str, ras_object=None) -> None

Update the project file with a new entry.

Parameters: prj_file (Path): Path to the project file file_type (str): Type of file being added (e.g., 'Plan', 'Geom') new_num (str): Number of the new file entry ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Example:

RasUtils.update_project_file(Path("project.prj"), "Plan", "02")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def update_project_file(prj_file: Path, file_type: str, new_num: str, ras_object=None) -> None:
    """
    Update the project file with a new entry.

    Parameters:
    prj_file (Path): Path to the project file
    file_type (str): Type of file being added (e.g., 'Plan', 'Geom')
    new_num (str): Number of the new file entry
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Example:
    >>> RasUtils.update_project_file(Path("project.prj"), "Plan", "02")
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    try:
        with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
            lines = f.readlines()

        new_line = f"{file_type} File={file_type[0].lower()}{new_num}\n"
        lines.append(new_line)

        with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
            f.writelines(lines)
        logger.info(f"Project file updated with new {file_type} entry: {new_num}")
    except Exception as e:
        logger.exception(f"Failed to update project file {prj_file}")
        raise
remove_prj_entry staticmethod
Python
remove_prj_entry(prj_file: Path, file_type: str, number: str, ras_object=None) -> None

Remove a file entry from the .prj file.

Parameters: prj_file (Path): Path to the project file file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow') number (str): Two-digit number of the entry to remove (e.g., '05') ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Example:

RasUtils.remove_prj_entry(Path("project.prj"), "Plan", "05")

Removes the line "Plan File=p05" from the .prj file
Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def remove_prj_entry(prj_file: Path, file_type: str, number: str, ras_object=None) -> None:
    """
    Remove a file entry from the .prj file.

    Parameters:
    prj_file (Path): Path to the project file
    file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow')
    number (str): Two-digit number of the entry to remove (e.g., '05')
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Example:
    >>> RasUtils.remove_prj_entry(Path("project.prj"), "Plan", "05")
    # Removes the line "Plan File=p05" from the .prj file
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    prefix = file_type[0].lower()
    target = f"{file_type} File={prefix}{number}"

    try:
        with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
            lines = f.readlines()

        new_lines = [line for line in lines if line.strip() != target]

        if len(new_lines) == len(lines):
            logger.warning(f"Entry '{target}' not found in {prj_file}")
            return

        with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
            f.writelines(new_lines)
        logger.info(f"Removed {file_type} entry {number} from project file")
    except Exception as e:
        logger.exception(f"Failed to remove entry from project file {prj_file}")
        raise
rename_prj_entry staticmethod
Python
rename_prj_entry(prj_file: Path, file_type: str, old_number: str, new_number: str, ras_object=None) -> None

Rename a file entry in the .prj file.

Parameters: prj_file (Path): Path to the project file file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow') old_number (str): Current two-digit number (e.g., '05') new_number (str): New two-digit number (e.g., '02') ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

Example:

RasUtils.rename_prj_entry(Path("project.prj"), "Plan", "05", "02")

Changes "Plan File=p05" to "Plan File=p02" in the .prj file
Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def rename_prj_entry(prj_file: Path, file_type: str, old_number: str, new_number: str, ras_object=None) -> None:
    """
    Rename a file entry in the .prj file.

    Parameters:
    prj_file (Path): Path to the project file
    file_type (str): Type of file entry ('Plan', 'Geom', 'Unsteady', or 'Flow')
    old_number (str): Current two-digit number (e.g., '05')
    new_number (str): New two-digit number (e.g., '02')
    ras_object (RasPrj, optional): RAS object to use. If None, uses the default ras object.

    Example:
    >>> RasUtils.rename_prj_entry(Path("project.prj"), "Plan", "05", "02")
    # Changes "Plan File=p05" to "Plan File=p02" in the .prj file
    """
    ras_obj = ras_object or ras
    ras_obj.check_initialized()

    prefix = file_type[0].lower()
    old_line = f"{file_type} File={prefix}{old_number}"
    new_line_content = f"{file_type} File={prefix}{new_number}"

    try:
        with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
            lines = f.readlines()

        found = False
        for i, line in enumerate(lines):
            if line.strip() == old_line:
                lines[i] = new_line_content + '\n'
                found = True
                break

        if not found:
            logger.warning(f"Entry '{old_line}' not found in {prj_file}")
            return

        with open(prj_file, 'w', encoding='utf-8', errors='replace') as f:
            f.writelines(lines)
        logger.info(f"Renamed {file_type} entry {old_number} to {new_number} in project file")
    except Exception as e:
        logger.exception(f"Failed to rename entry in project file {prj_file}")
        raise
backup_files staticmethod
Python
backup_files(files: List[Union[Path, str]], project_folder: Union[Path, str], operation_label: str = 'deleted') -> Optional[Path]

Move files to a timestamped Backup folder inside the project.

Creates {project_folder}/Backup/{YYYY-MM-DD_HHMMSS}_{operation_label}/ and moves each existing file into that folder. Non-existent files are silently skipped.

Parameters: files (List[Union[Path, str]]): File paths to back up (str or Path). project_folder (Union[Path, str]): Project root where Backup/ will be created. operation_label (str): Label appended to timestamp folder name (e.g., "deleted_p05").

Returns: Optional[Path]: Path to backup folder if any files were moved, None otherwise.

Example:

files = [Path("Muncie.p05"), Path("Muncie.p05.hdf")] backup_dir = RasUtils.backup_files(files, project_folder, "deleted_p05")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def backup_files(
    files: List[Union[Path, str]],
    project_folder: Union[Path, str],
    operation_label: str = "deleted",
) -> Optional[Path]:
    """
    Move files to a timestamped Backup folder inside the project.

    Creates {project_folder}/Backup/{YYYY-MM-DD_HHMMSS}_{operation_label}/
    and moves each existing file into that folder. Non-existent files are
    silently skipped.

    Parameters:
    files (List[Union[Path, str]]): File paths to back up (str or Path).
    project_folder (Union[Path, str]): Project root where Backup/ will be created.
    operation_label (str): Label appended to timestamp folder name (e.g., "deleted_p05").

    Returns:
    Optional[Path]: Path to backup folder if any files were moved, None otherwise.

    Example:
    >>> files = [Path("Muncie.p05"), Path("Muncie.p05.hdf")]
    >>> backup_dir = RasUtils.backup_files(files, project_folder, "deleted_p05")
    """
    files = [Path(f) for f in files]
    existing_files = [f for f in files if f.exists()]
    if not existing_files:
        return None

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
    backup_folder = Path(project_folder) / "Backup" / f"{timestamp}_{operation_label}"
    backup_folder.mkdir(parents=True, exist_ok=True)

    for f in existing_files:
        shutil.move(str(f), str(backup_folder / f.name))
        logger.info(f"Backed up {f.name} to {backup_folder}")

    return backup_folder
decode_byte_strings staticmethod
Python
decode_byte_strings(dataframe: DataFrame) -> pd.DataFrame

Decodes byte strings in a DataFrame to regular string objects.

This function converts columns with byte-encoded strings (e.g., b'string') into UTF-8 decoded strings.

Parameters:

Name Type Description Default
dataframe DataFrame

The DataFrame containing byte-encoded string columns.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with byte strings decoded to regular strings.

Example

df = pd.DataFrame({'A': [b'hello', b'world'], 'B': [1, 2]}) decoded_df = RasUtils.decode_byte_strings(df) print(decoded_df) A B 0 hello 1 1 world 2

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def decode_byte_strings(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Decodes byte strings in a DataFrame to regular string objects.

    This function converts columns with byte-encoded strings (e.g., b'string') into UTF-8 decoded strings.

    Args:
        dataframe (pd.DataFrame): The DataFrame containing byte-encoded string columns.

    Returns:
        pd.DataFrame: The DataFrame with byte strings decoded to regular strings.

    Example:
        >>> df = pd.DataFrame({'A': [b'hello', b'world'], 'B': [1, 2]})
        >>> decoded_df = RasUtils.decode_byte_strings(df)
        >>> print(decoded_df)
            A  B
        0  hello  1
        1  world  2
    """
    str_df = dataframe.select_dtypes(['object'])
    str_df = str_df.stack().str.decode('utf-8').unstack()
    for col in str_df:
        dataframe[col] = str_df[col]
    return dataframe
perform_kdtree_query staticmethod
Python
perform_kdtree_query(reference_points: ndarray, query_points: ndarray, max_distance: float = 2.0) -> np.ndarray

Performs a KDTree query between two datasets and returns indices with distances exceeding max_distance set to -1.

Parameters:

Name Type Description Default
reference_points ndarray

The reference dataset for KDTree.

required
query_points ndarray

The query dataset to search against KDTree of reference_points.

required
max_distance float

The maximum distance threshold. Indices with distances greater than this are set to -1. Defaults to 2.0.

2.0

Returns:

Type Description
ndarray

np.ndarray: Array of indices from reference_points that are nearest to each point in query_points. Indices with distances > max_distance are set to -1.

Example

ref_points = np.array([[0, 0], [1, 1], [2, 2]]) query_points = np.array([[0.5, 0.5], [3, 3]]) result = RasUtils.perform_kdtree_query(ref_points, query_points) print(result) array([ 0, -1])

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def perform_kdtree_query(
    reference_points: np.ndarray,
    query_points: np.ndarray,
    max_distance: float = 2.0
) -> np.ndarray:
    """
    Performs a KDTree query between two datasets and returns indices with distances exceeding max_distance set to -1.

    Args:
        reference_points (np.ndarray): The reference dataset for KDTree.
        query_points (np.ndarray): The query dataset to search against KDTree of reference_points.
        max_distance (float, optional): The maximum distance threshold. Indices with distances greater than this are set to -1. Defaults to 2.0.

    Returns:
        np.ndarray: Array of indices from reference_points that are nearest to each point in query_points. 
                    Indices with distances > max_distance are set to -1.

    Example:
        >>> ref_points = np.array([[0, 0], [1, 1], [2, 2]])
        >>> query_points = np.array([[0.5, 0.5], [3, 3]])
        >>> result = RasUtils.perform_kdtree_query(ref_points, query_points)
        >>> print(result)
        array([ 0, -1])
    """
    dist, snap = KDTree(reference_points).query(query_points, distance_upper_bound=max_distance)
    snap[dist > max_distance] = -1
    return snap
find_nearest_neighbors staticmethod
Python
find_nearest_neighbors(points: ndarray, max_distance: float = 2.0) -> np.ndarray

Creates a self KDTree for dataset points and finds nearest neighbors excluding self, with distances above max_distance set to -1.

Parameters:

Name Type Description Default
points ndarray

The dataset to build the KDTree from and query against itself.

required
max_distance float

The maximum distance threshold. Indices with distances greater than max_distance are set to -1. Defaults to 2.0.

2.0

Returns:

Type Description
ndarray

np.ndarray: Array of indices representing the nearest neighbor in points for each point in points. Indices with distances > max_distance or self-matches are set to -1.

Example

points = np.array([[0, 0], [1, 1], [2, 2], [10, 10]]) result = RasUtils.find_nearest_neighbors(points) print(result) array([1, 0, 1, -1])

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def find_nearest_neighbors(points: np.ndarray, max_distance: float = 2.0) -> np.ndarray:
    """
    Creates a self KDTree for dataset points and finds nearest neighbors excluding self, 
    with distances above max_distance set to -1.

    Args:
        points (np.ndarray): The dataset to build the KDTree from and query against itself.
        max_distance (float, optional): The maximum distance threshold. Indices with distances 
                                        greater than max_distance are set to -1. Defaults to 2.0.

    Returns:
        np.ndarray: Array of indices representing the nearest neighbor in points for each point in points. 
                    Indices with distances > max_distance or self-matches are set to -1.

    Example:
        >>> points = np.array([[0, 0], [1, 1], [2, 2], [10, 10]])
        >>> result = RasUtils.find_nearest_neighbors(points)
        >>> print(result)
        array([1, 0, 1, -1])
    """
    dist, snap = KDTree(points).query(points, k=2, distance_upper_bound=max_distance)
    snap[dist > max_distance] = -1

    snp = pd.DataFrame(snap, index=np.arange(len(snap)))
    snp = snp.replace(-1, np.nan)
    snp.loc[snp[0] == snp.index, 0] = np.nan
    snp.loc[snp[1] == snp.index, 1] = np.nan
    filled = snp[0].fillna(snp[1])
    snapped = filled.fillna(-1).astype(np.int64).to_numpy()
    return snapped
consolidate_dataframe staticmethod
Python
consolidate_dataframe(dataframe: DataFrame, group_by: Optional[Union[str, List[str]]] = None, pivot_columns: Optional[Union[str, List[str]]] = None, level: Optional[int] = None, n_dimensional: bool = False, aggregation_method: Union[str, Callable] = 'list') -> pd.DataFrame

Consolidate rows in a DataFrame by merging duplicate values into lists or using a specified aggregation function.

Parameters:

Name Type Description Default
dataframe DataFrame

The DataFrame to consolidate.

required
group_by Optional[Union[str, List[str]]]

Columns or indices to group by.

None
pivot_columns Optional[Union[str, List[str]]]

Columns to pivot.

None
level Optional[int]

Level of multi-index to group by.

None
n_dimensional bool

If True, use a pivot table for N-Dimensional consolidation.

False
aggregation_method Union[str, Callable]

Aggregation method, e.g., 'list' to aggregate into lists.

'list'

Returns:

Type Description
DataFrame

pd.DataFrame: The consolidated DataFrame.

Example

df = pd.DataFrame({'A': [1, 1, 2], 'B': [4, 5, 6], 'C': [7, 8, 9]}) result = RasUtils.consolidate_dataframe(df, group_by='A') print(result) B C A
1 [4, 5] [7, 8] 2 [6] [9]

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def consolidate_dataframe(
    dataframe: pd.DataFrame,
    group_by: Optional[Union[str, List[str]]] = None,
    pivot_columns: Optional[Union[str, List[str]]] = None,
    level: Optional[int] = None,
    n_dimensional: bool = False,
    aggregation_method: Union[str, Callable] = 'list'
) -> pd.DataFrame:
    """
    Consolidate rows in a DataFrame by merging duplicate values into lists or using a specified aggregation function.

    Args:
        dataframe (pd.DataFrame): The DataFrame to consolidate.
        group_by (Optional[Union[str, List[str]]]): Columns or indices to group by.
        pivot_columns (Optional[Union[str, List[str]]]): Columns to pivot.
        level (Optional[int]): Level of multi-index to group by.
        n_dimensional (bool): If True, use a pivot table for N-Dimensional consolidation.
        aggregation_method (Union[str, Callable]): Aggregation method, e.g., 'list' to aggregate into lists.

    Returns:
        pd.DataFrame: The consolidated DataFrame.

    Example:
        >>> df = pd.DataFrame({'A': [1, 1, 2], 'B': [4, 5, 6], 'C': [7, 8, 9]})
        >>> result = RasUtils.consolidate_dataframe(df, group_by='A')
        >>> print(result)
        B         C
        A            
        1  [4, 5]  [7, 8]
        2  [6]     [9]
    """
    if aggregation_method == 'list':
        agg_func = lambda x: tuple(x)
    else:
        agg_func = aggregation_method

    if n_dimensional:
        result = dataframe.pivot_table(group_by, pivot_columns, aggfunc=agg_func)
    else:
        result = dataframe.groupby(group_by, level=level).agg(agg_func).map(list)

    return result
find_nearest_value staticmethod
Python
find_nearest_value(array: Union[list, ndarray], target_value: Union[int, float]) -> Union[int, float]

Finds the nearest value in a NumPy array to the specified target value.

Parameters:

Name Type Description Default
array Union[list, ndarray]

The array to search within.

required
target_value Union[int, float]

The value to find the nearest neighbor to.

required

Returns:

Type Description
Union[int, float]

Union[int, float]: The nearest value in the array to the specified target value.

Example

arr = np.array([1, 3, 5, 7, 9]) result = RasUtils.find_nearest_value(arr, 6) print(result) 5

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def find_nearest_value(array: Union[list, np.ndarray], target_value: Union[int, float]) -> Union[int, float]:
    """
    Finds the nearest value in a NumPy array to the specified target value.

    Args:
        array (Union[list, np.ndarray]): The array to search within.
        target_value (Union[int, float]): The value to find the nearest neighbor to.

    Returns:
        Union[int, float]: The nearest value in the array to the specified target value.

    Example:
        >>> arr = np.array([1, 3, 5, 7, 9])
        >>> result = RasUtils.find_nearest_value(arr, 6)
        >>> print(result)
        5
    """
    array = np.asarray(array)
    idx = (np.abs(array - target_value)).argmin()
    return array[idx]
horizontal_distance staticmethod
Python
horizontal_distance(coord1: ndarray, coord2: ndarray) -> float

Calculate the horizontal distance between two coordinate points.

Parameters:

Name Type Description Default
coord1 ndarray

First coordinate point [X, Y].

required
coord2 ndarray

Second coordinate point [X, Y].

required

Returns:

Name Type Description
float float

Horizontal distance.

Example

distance = RasUtils.horizontal_distance(np.array([0, 0]), np.array([3, 4])) print(distance) 5.0

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def horizontal_distance(coord1: np.ndarray, coord2: np.ndarray) -> float:
    """
    Calculate the horizontal distance between two coordinate points.

    Args:
        coord1 (np.ndarray): First coordinate point [X, Y].
        coord2 (np.ndarray): Second coordinate point [X, Y].

    Returns:
        float: Horizontal distance.

    Example:
        >>> distance = RasUtils.horizontal_distance(np.array([0, 0]), np.array([3, 4]))
        >>> print(distance)
        5.0
    """
    return np.linalg.norm(coord2 - coord1)
find_valid_ras_folders staticmethod
Python
find_valid_ras_folders(search_path: Union[str, Path], max_depth: Optional[int] = None, return_project_info: bool = False) -> Union[List[Path], List[Dict[str, Any]]]

Recursively search for valid HEC-RAS project folders.

A valid HEC-RAS project folder contains: 1. A .prj file with "Proj Title=" on the first line (HEC-RAS project file) 2. At least one .pXX file where XX is 01-99 (plan files)

This function does NOT require the global ras object to be initialized, making it suitable for discovery operations before project initialization.

Parameters:

Name Type Description Default
search_path Union[str, Path]

Root directory to search for HEC-RAS projects.

required
max_depth Optional[int]

Maximum folder depth to search. None means unlimited. Depth 0 = search_path only, 1 = immediate subdirectories, etc.

None
return_project_info bool

If True, return list of dicts with folder path, project name, prj file path, and plan count. If False, return list of Paths.

False

Returns:

Type Description
Union[List[Path], List[Dict[str, Any]]]

Union[List[Path], List[Dict[str, Any]]]: - If return_project_info=False: List of Path objects for valid HEC-RAS folders - If return_project_info=True: List of dicts with keys: - 'folder': Path to the project folder - 'project_name': Name extracted from .prj filename - 'prj_file': Path to the .prj file - 'plan_count': Number of plan files found - 'plan_numbers': List of plan numbers (e.g., ['01', '02', '15'])

Example
Find all valid HEC-RAS project folders

folders = RasUtils.find_valid_ras_folders("C:/Projects/Hydrology") for folder in folders: ... print(f"Found project: {folder}")

Get detailed info about each project

projects = RasUtils.find_valid_ras_folders( ... "C:/Projects", ... max_depth=3, ... return_project_info=True ... ) for proj in projects: ... print(f"{proj['project_name']}: {proj['plan_count']} plans")

Note

This function distinguishes HEC-RAS .prj files from ESRI projection files by checking for "Proj Title=" on the first line of the file.

Source code in ras_commander/RasUtils.py
Python
@staticmethod
def find_valid_ras_folders(
    search_path: Union[str, Path],
    max_depth: Optional[int] = None,
    return_project_info: bool = False
) -> Union[List[Path], List[Dict[str, Any]]]:
    """
    Recursively search for valid HEC-RAS project folders.

    A valid HEC-RAS project folder contains:
    1. A .prj file with "Proj Title=" on the first line (HEC-RAS project file)
    2. At least one .pXX file where XX is 01-99 (plan files)

    This function does NOT require the global ras object to be initialized,
    making it suitable for discovery operations before project initialization.

    Args:
        search_path (Union[str, Path]): Root directory to search for HEC-RAS projects.
        max_depth (Optional[int]): Maximum folder depth to search. None means unlimited.
            Depth 0 = search_path only, 1 = immediate subdirectories, etc.
        return_project_info (bool): If True, return list of dicts with folder path,
            project name, prj file path, and plan count. If False, return list of Paths.

    Returns:
        Union[List[Path], List[Dict[str, Any]]]:
            - If return_project_info=False: List of Path objects for valid HEC-RAS folders
            - If return_project_info=True: List of dicts with keys:
                - 'folder': Path to the project folder
                - 'project_name': Name extracted from .prj filename
                - 'prj_file': Path to the .prj file
                - 'plan_count': Number of plan files found
                - 'plan_numbers': List of plan numbers (e.g., ['01', '02', '15'])

    Example:
        >>> # Find all valid HEC-RAS project folders
        >>> folders = RasUtils.find_valid_ras_folders("C:/Projects/Hydrology")
        >>> for folder in folders:
        ...     print(f"Found project: {folder}")

        >>> # Get detailed info about each project
        >>> projects = RasUtils.find_valid_ras_folders(
        ...     "C:/Projects",
        ...     max_depth=3,
        ...     return_project_info=True
        ... )
        >>> for proj in projects:
        ...     print(f"{proj['project_name']}: {proj['plan_count']} plans")

    Note:
        This function distinguishes HEC-RAS .prj files from ESRI projection files
        by checking for "Proj Title=" on the first line of the file.
    """
    search_path = Path(search_path)
    if not search_path.exists():
        logger.warning(f"Search path does not exist: {search_path}")
        return []

    if not search_path.is_dir():
        logger.warning(f"Search path is not a directory: {search_path}")
        return []

    valid_folders = []

    def is_valid_ras_prj(prj_file: Path) -> bool:
        """Check if a .prj file is a valid HEC-RAS project file."""
        try:
            with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                first_line = f.readline()
                return first_line.strip().startswith("Proj Title=")
        except Exception as e:
            logger.debug(f"Could not read .prj file {prj_file}: {e}")
            return False

    def get_plan_files(folder: Path) -> List[Tuple[str, Path]]:
        """Get all valid plan files (.p01 to .p99) in a folder."""
        plan_files = []
        for i in range(1, 100):
            plan_num = f"{i:02d}"
            # Look for files matching *.pXX pattern
            for pfile in folder.glob(f"*.p{plan_num}"):
                plan_files.append((plan_num, pfile))
        return plan_files

    def check_folder(folder: Path) -> Optional[Dict[str, Any]]:
        """Check if a folder is a valid HEC-RAS project folder."""
        # Find .prj files
        prj_files = list(folder.glob("*.prj"))

        if not prj_files:
            return None

        # Find valid HEC-RAS .prj file (not ESRI projection file)
        valid_prj = None
        for prj_file in prj_files:
            if is_valid_ras_prj(prj_file):
                valid_prj = prj_file
                break

        if valid_prj is None:
            return None

        # Check for plan files
        plan_files = get_plan_files(folder)
        if not plan_files:
            return None

        # This is a valid HEC-RAS project folder
        return {
            'folder': folder,
            'project_name': valid_prj.stem,
            'prj_file': valid_prj,
            'plan_count': len(plan_files),
            'plan_numbers': [pn for pn, _ in plan_files]
        }

    def scan_directory(current_path: Path, current_depth: int):
        """Recursively scan directories for HEC-RAS projects."""
        # Check if we've exceeded max depth
        if max_depth is not None and current_depth > max_depth:
            return

        # Check current folder
        result = check_folder(current_path)
        if result:
            valid_folders.append(result)
            # Don't search subdirectories of a valid project folder
            # (nested projects are uncommon and would cause confusion)
            return

        # Scan subdirectories
        try:
            for item in current_path.iterdir():
                if item.is_dir() and not item.name.startswith('.'):
                    scan_directory(item, current_depth + 1)
        except PermissionError:
            logger.debug(f"Permission denied accessing: {current_path}")
        except Exception as e:
            logger.debug(f"Error scanning {current_path}: {e}")

    # Start scanning
    logger.info(f"Searching for HEC-RAS projects in: {search_path}")
    scan_directory(search_path, 0)
    logger.info(f"Found {len(valid_folders)} valid HEC-RAS project folders")

    if return_project_info:
        return valid_folders
    else:
        return [info['folder'] for info in valid_folders]
is_valid_ras_folder staticmethod
Python
is_valid_ras_folder(folder_path: Union[str, Path]) -> bool

Check if a single folder is a valid HEC-RAS project folder.

A valid HEC-RAS project folder contains: 1. A .prj file with "Proj Title=" on the first line 2. At least one .pXX file where XX is 01-99

This function does NOT require the global ras object to be initialized.

Parameters:

Name Type Description Default
folder_path Union[str, Path]

Path to the folder to check.

required

Returns:

Name Type Description
bool bool

True if the folder is a valid HEC-RAS project folder.

Example

if RasUtils.is_valid_ras_folder("C:/Projects/MyRASModel"): ... print("This is a valid HEC-RAS project folder") ... else: ... print("Not a valid HEC-RAS project folder")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
def is_valid_ras_folder(folder_path: Union[str, Path]) -> bool:
    """
    Check if a single folder is a valid HEC-RAS project folder.

    A valid HEC-RAS project folder contains:
    1. A .prj file with "Proj Title=" on the first line
    2. At least one .pXX file where XX is 01-99

    This function does NOT require the global ras object to be initialized.

    Args:
        folder_path (Union[str, Path]): Path to the folder to check.

    Returns:
        bool: True if the folder is a valid HEC-RAS project folder.

    Example:
        >>> if RasUtils.is_valid_ras_folder("C:/Projects/MyRASModel"):
        ...     print("This is a valid HEC-RAS project folder")
        ... else:
        ...     print("Not a valid HEC-RAS project folder")
    """
    folder_path = Path(folder_path)
    if not folder_path.exists() or not folder_path.is_dir():
        return False

    # Find .prj files
    prj_files = list(folder_path.glob("*.prj"))
    if not prj_files:
        return False

    # Check if any .prj file is a valid HEC-RAS project file
    def is_valid_ras_prj(prj_file: Path) -> bool:
        try:
            with open(prj_file, 'r', encoding='utf-8', errors='replace') as f:
                first_line = f.readline()
                return first_line.strip().startswith("Proj Title=")
        except Exception:
            return False

    has_valid_prj = any(is_valid_ras_prj(pf) for pf in prj_files)
    if not has_valid_prj:
        return False

    # Check for at least one plan file (.p01 to .p99)
    for i in range(1, 100):
        plan_num = f"{i:02d}"
        if list(folder_path.glob(f"*.p{plan_num}")):
            return True

    return False
safe_write_geometry staticmethod
Python
safe_write_geometry(geom_file: Union[str, Path], modified_lines: List[str], create_backup: bool = True) -> Optional[Path]

Atomically write geometry file with backup for safe file modification.

This function implements safe file modification for HEC-RAS geometry files, ensuring data integrity through atomic operations and optional backup creation.

Process
  1. Create timestamped backup: geom_file.YYYYMMDD_HHMMSS.bak
  2. Write to temp file: geom_file.tmp
  3. Basic validation (line count reasonable, file size reasonable)
  4. Atomic rename temp -> original (os.replace)
  5. Return backup path

Parameters:

Name Type Description Default
geom_file Union[str, Path]

Path to the geometry file to write.

required
modified_lines List[str]

List of lines to write to the file. Each line should include newline characters if needed.

required
create_backup bool

If True, create timestamped backup before modification. Defaults to True for safety.

True

Returns:

Type Description
Optional[Path]

Optional[Path]: Path to backup file if create_backup=True and successful, None if create_backup=False or file didn't exist before.

Raises:

Type Description
FileNotFoundError

If the geometry file doesn't exist (for modification).

PermissionError

If write access is denied to the file or directory.

ValueError

If modified_lines is empty or validation fails.

IOError

If atomic rename fails.

Example

from ras_commander import RasUtils from pathlib import Path

Read geometry file

geom_file = Path("project/geometry.g01") with open(geom_file, 'r') as f: ... lines = f.readlines()

Modify HTAB parameters (example)

modified_lines = modify_htab_params(lines, starting_el=580.0)

Safe write with backup

backup_path = RasUtils.safe_write_geometry(geom_file, modified_lines) print(f"Backup created at: {backup_path}")

Notes
  • This function uses os.replace() for atomic rename, which is atomic on both Windows (NTFS) and Unix filesystems.
  • Backup files use format: filename.YYYYMMDD_HHMMSS.bak
  • If validation fails, temp file is deleted and original remains unchanged.
  • For rollback, use rollback_geometry() with the returned backup path.
See Also
  • rollback_geometry: Restore from backup after failed modification
  • .claude/rules/python/path-handling.md: Path handling patterns
Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def safe_write_geometry(
    geom_file: Union[str, Path],
    modified_lines: List[str],
    create_backup: bool = True
) -> Optional[Path]:
    """
    Atomically write geometry file with backup for safe file modification.

    This function implements safe file modification for HEC-RAS geometry files,
    ensuring data integrity through atomic operations and optional backup creation.

    Process:
        1. Create timestamped backup: geom_file.YYYYMMDD_HHMMSS.bak
        2. Write to temp file: geom_file.tmp
        3. Basic validation (line count reasonable, file size reasonable)
        4. Atomic rename temp -> original (os.replace)
        5. Return backup path

    Parameters:
        geom_file (Union[str, Path]): Path to the geometry file to write.
        modified_lines (List[str]): List of lines to write to the file.
            Each line should include newline characters if needed.
        create_backup (bool): If True, create timestamped backup before modification.
            Defaults to True for safety.

    Returns:
        Optional[Path]: Path to backup file if create_backup=True and successful,
            None if create_backup=False or file didn't exist before.

    Raises:
        FileNotFoundError: If the geometry file doesn't exist (for modification).
        PermissionError: If write access is denied to the file or directory.
        ValueError: If modified_lines is empty or validation fails.
        IOError: If atomic rename fails.

    Example:
        >>> from ras_commander import RasUtils
        >>> from pathlib import Path
        >>>
        >>> # Read geometry file
        >>> geom_file = Path("project/geometry.g01")
        >>> with open(geom_file, 'r') as f:
        ...     lines = f.readlines()
        >>>
        >>> # Modify HTAB parameters (example)
        >>> modified_lines = modify_htab_params(lines, starting_el=580.0)
        >>>
        >>> # Safe write with backup
        >>> backup_path = RasUtils.safe_write_geometry(geom_file, modified_lines)
        >>> print(f"Backup created at: {backup_path}")

    Notes:
        - This function uses os.replace() for atomic rename, which is atomic on
          both Windows (NTFS) and Unix filesystems.
        - Backup files use format: filename.YYYYMMDD_HHMMSS.bak
        - If validation fails, temp file is deleted and original remains unchanged.
        - For rollback, use rollback_geometry() with the returned backup path.

    See Also:
        - rollback_geometry: Restore from backup after failed modification
        - .claude/rules/python/path-handling.md: Path handling patterns
    """
    geom_file = Path(geom_file)
    backup_path = None
    temp_path = None

    # Validate inputs
    if not modified_lines:
        raise ValueError("modified_lines cannot be empty")

    # Verify original file exists (we're modifying, not creating)
    if not geom_file.exists():
        raise FileNotFoundError(f"Geometry file not found: {geom_file}")

    # Check write permissions
    if not os.access(geom_file.parent, os.W_OK):
        raise PermissionError(f"Write permission denied for directory: {geom_file.parent}")

    try:
        # Read original file for validation comparison
        original_size = geom_file.stat().st_size
        with open(geom_file, 'r', encoding='utf-8', errors='replace') as f:
            original_line_count = sum(1 for _ in f)

        # Step 1: Create timestamped backup
        if create_backup:
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_path = geom_file.parent / f"{geom_file.name}.{timestamp}.bak"

            # Copy original to backup
            shutil.copy2(geom_file, backup_path)
            logger.info(f"Backup created: {backup_path}")

        # Step 2: Write to temp file
        temp_path = geom_file.parent / f"{geom_file.name}.tmp"
        with open(temp_path, 'w', encoding='utf-8', newline='') as f:
            f.writelines(modified_lines)
        logger.debug(f"Temp file written: {temp_path}")

        # Step 3: Basic validation
        temp_size = temp_path.stat().st_size
        new_line_count = len(modified_lines)

        # Validation: File shouldn't be empty
        if temp_size == 0:
            raise ValueError("Modified file would be empty - validation failed")

        # Validation: Line count shouldn't change drastically (>50% reduction suspicious)
        if new_line_count < original_line_count * 0.5:
            raise ValueError(
                f"Line count reduced drastically ({original_line_count} -> {new_line_count}). "
                f"This may indicate data corruption. Aborting."
            )

        # Validation: File size shouldn't shrink too much (>80% reduction suspicious)
        if temp_size < original_size * 0.2:
            raise ValueError(
                f"File size reduced drastically ({original_size} -> {temp_size} bytes). "
                f"This may indicate data corruption. Aborting."
            )

        logger.debug(
            f"Validation passed: {new_line_count} lines, {temp_size} bytes "
            f"(original: {original_line_count} lines, {original_size} bytes)"
        )

        # Step 4: Atomic rename temp -> original
        # os.replace() is atomic on both Windows (NTFS) and Unix
        os.replace(temp_path, geom_file)
        temp_path = None  # Mark as successfully moved
        logger.info(f"Geometry file atomically updated: {geom_file}")

        return backup_path

    except Exception as e:
        # Clean up temp file if it exists
        if temp_path and temp_path.exists():
            try:
                temp_path.unlink()
                logger.debug(f"Cleaned up temp file: {temp_path}")
            except Exception as cleanup_error:
                logger.warning(f"Failed to clean up temp file {temp_path}: {cleanup_error}")

        logger.error(f"Failed to write geometry file {geom_file}: {e}")
        raise
rollback_geometry staticmethod
Python
rollback_geometry(geom_file: Union[str, Path], backup_path: Union[str, Path]) -> None

Restore geometry file from backup after failed modification.

This function restores a geometry file from a previously created backup, typically used when a modification operation fails or produces incorrect results.

Process
  1. Verify backup file exists
  2. Copy backup -> original (preserves backup for safety)
  3. Log restoration

Parameters:

Name Type Description Default
geom_file Union[str, Path]

Path to the geometry file to restore.

required
backup_path Union[str, Path]

Path to the backup file created by safe_write_geometry().

required

Returns:

Type Description
None

None

Raises:

Type Description
FileNotFoundError

If backup file doesn't exist.

PermissionError

If write access is denied.

IOError

If copy operation fails.

Example

from ras_commander import RasUtils from pathlib import Path

Attempt modification

try: ... backup = RasUtils.safe_write_geometry(geom_file, modified_lines) ... # Run HEC-RAS to validate ... RasCmdr.compute_plan("01", clear_geompre=True) ... except Exception as e: ... # Modification failed - rollback ... if backup: ... RasUtils.rollback_geometry(geom_file, backup) ... print("Geometry file restored from backup") ... raise

Notes
  • This function copies the backup to original, preserving the backup.
  • Use shutil.copy2() to preserve file metadata (timestamps, permissions).
  • After successful rollback, you may want to delete the backup manually if no longer needed.
See Also
  • safe_write_geometry: Create backup and safely write modifications
Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def rollback_geometry(
    geom_file: Union[str, Path],
    backup_path: Union[str, Path]
) -> None:
    """
    Restore geometry file from backup after failed modification.

    This function restores a geometry file from a previously created backup,
    typically used when a modification operation fails or produces incorrect results.

    Process:
        1. Verify backup file exists
        2. Copy backup -> original (preserves backup for safety)
        3. Log restoration

    Parameters:
        geom_file (Union[str, Path]): Path to the geometry file to restore.
        backup_path (Union[str, Path]): Path to the backup file created by
            safe_write_geometry().

    Returns:
        None

    Raises:
        FileNotFoundError: If backup file doesn't exist.
        PermissionError: If write access is denied.
        IOError: If copy operation fails.

    Example:
        >>> from ras_commander import RasUtils
        >>> from pathlib import Path
        >>>
        >>> # Attempt modification
        >>> try:
        ...     backup = RasUtils.safe_write_geometry(geom_file, modified_lines)
        ...     # Run HEC-RAS to validate
        ...     RasCmdr.compute_plan("01", clear_geompre=True)
        ... except Exception as e:
        ...     # Modification failed - rollback
        ...     if backup:
        ...         RasUtils.rollback_geometry(geom_file, backup)
        ...         print("Geometry file restored from backup")
        ...     raise

    Notes:
        - This function copies the backup to original, preserving the backup.
        - Use shutil.copy2() to preserve file metadata (timestamps, permissions).
        - After successful rollback, you may want to delete the backup manually
          if no longer needed.

    See Also:
        - safe_write_geometry: Create backup and safely write modifications
    """
    geom_file = Path(geom_file)
    backup_path = Path(backup_path)

    # Verify backup exists
    if not backup_path.exists():
        raise FileNotFoundError(f"Backup file not found: {backup_path}")

    # Check write permissions
    if geom_file.exists() and not os.access(geom_file, os.W_OK):
        raise PermissionError(f"Write permission denied for file: {geom_file}")

    if not os.access(geom_file.parent, os.W_OK):
        raise PermissionError(f"Write permission denied for directory: {geom_file.parent}")

    try:
        # Copy backup to original (preserves backup for safety)
        shutil.copy2(backup_path, geom_file)
        logger.info(f"Geometry file restored from backup: {geom_file} <- {backup_path}")

    except Exception as e:
        logger.error(f"Failed to restore geometry file {geom_file} from {backup_path}: {e}")
        raise
validate_geometry_file_basic staticmethod
Python
validate_geometry_file_basic(geom_file: Union[str, Path], min_lines: int = 10, required_patterns: Optional[List[str]] = None) -> bool

Perform basic validation on a geometry file.

This function checks that a geometry file meets basic structural requirements, useful for pre-modification validation or post-write verification.

Parameters:

Name Type Description Default
geom_file Union[str, Path]

Path to the geometry file to validate.

required
min_lines int

Minimum number of lines expected. Defaults to 10.

10
required_patterns Optional[List[str]]

List of strings that must appear somewhere in the file. Defaults to ["River Reach="] for HEC-RAS geometry.

None

Returns:

Name Type Description
bool bool

True if validation passes, False otherwise.

Example

if RasUtils.validate_geometry_file_basic(geom_file): ... print("Geometry file appears valid")

Custom validation

if RasUtils.validate_geometry_file_basic( ... geom_file, ... required_patterns=["River Reach=", "Type RM Length"] ... ): ... print("Geometry file has cross sections")

Notes
  • This is a basic structural check, not a full HEC-RAS validation.
  • For comprehensive validation, use HEC-RAS geometric preprocessor.
Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def validate_geometry_file_basic(
    geom_file: Union[str, Path],
    min_lines: int = 10,
    required_patterns: Optional[List[str]] = None
) -> bool:
    """
    Perform basic validation on a geometry file.

    This function checks that a geometry file meets basic structural requirements,
    useful for pre-modification validation or post-write verification.

    Parameters:
        geom_file (Union[str, Path]): Path to the geometry file to validate.
        min_lines (int): Minimum number of lines expected. Defaults to 10.
        required_patterns (Optional[List[str]]): List of strings that must appear
            somewhere in the file. Defaults to ["River Reach="] for HEC-RAS geometry.

    Returns:
        bool: True if validation passes, False otherwise.

    Example:
        >>> if RasUtils.validate_geometry_file_basic(geom_file):
        ...     print("Geometry file appears valid")
        >>>
        >>> # Custom validation
        >>> if RasUtils.validate_geometry_file_basic(
        ...     geom_file,
        ...     required_patterns=["River Reach=", "Type RM Length"]
        ... ):
        ...     print("Geometry file has cross sections")

    Notes:
        - This is a basic structural check, not a full HEC-RAS validation.
        - For comprehensive validation, use HEC-RAS geometric preprocessor.
    """
    geom_file = Path(geom_file)

    if required_patterns is None:
        # Default: Check for River Reach definition (present in most geometry files)
        required_patterns = ["River Reach="]

    if not geom_file.exists():
        logger.warning(f"Geometry file does not exist: {geom_file}")
        return False

    try:
        with open(geom_file, 'r', encoding='utf-8', errors='replace') as f:
            content = f.read()
            lines = content.splitlines()

        # Check minimum line count
        if len(lines) < min_lines:
            logger.warning(
                f"Geometry file has too few lines: {len(lines)} < {min_lines}"
            )
            return False

        # Check required patterns
        for pattern in required_patterns:
            if pattern not in content:
                logger.warning(f"Required pattern not found in geometry file: {pattern}")
                return False

        logger.debug(f"Geometry file validation passed: {geom_file}")
        return True

    except Exception as e:
        logger.error(f"Error validating geometry file {geom_file}: {e}")
        return False
dos2unix staticmethod
Python
dos2unix(project_dir: Union[str, Path], extensions: Optional[List[str]] = None) -> int

Convert CRLF line endings to LF in HEC-RAS text files.

Processes .b## and .g## files by default (boundary and geometry text files that need LF endings for Linux HEC-RAS execution). Done in-place using pure Python (no shell dependency).

Attribution: Implementation pattern derived from ras-agent (https://github.com/gheistand/ras-agent) by Glenn Heistand / CHAMP — Illinois State Water Survey. See runner.py:_dos2unix_dir().

Parameters:

Name Type Description Default
project_dir Union[str, Path]

Path to the HEC-RAS project directory.

required
extensions Optional[List[str]]

Custom regex patterns for file extensions to process. Defaults to [r'.(b|g)\d+$'] which matches .b01, .g01, etc.

None

Returns:

Name Type Description
int int

Number of files modified.

Example

from ras_commander import RasUtils count = RasUtils.dos2unix(Path("/project/dir")) print(f"Converted {count} files")

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def dos2unix(project_dir: Union[str, Path], extensions: Optional[List[str]] = None) -> int:
    """
    Convert CRLF line endings to LF in HEC-RAS text files.

    Processes .b## and .g## files by default (boundary and geometry text files
    that need LF endings for Linux HEC-RAS execution). Done in-place using
    pure Python (no shell dependency).

    Attribution: Implementation pattern derived from ras-agent
    (https://github.com/gheistand/ras-agent) by Glenn Heistand / CHAMP —
    Illinois State Water Survey. See runner.py:_dos2unix_dir().

    Parameters:
        project_dir (Union[str, Path]): Path to the HEC-RAS project directory.
        extensions (Optional[List[str]]): Custom regex patterns for file extensions
            to process. Defaults to [r'\\.(b|g)\\d+$'] which matches .b01, .g01, etc.

    Returns:
        int: Number of files modified.

    Example:
        >>> from ras_commander import RasUtils
        >>> count = RasUtils.dos2unix(Path("/project/dir"))
        >>> print(f"Converted {count} files")
    """
    import re

    project_dir = Path(project_dir)
    if not project_dir.is_dir():
        raise FileNotFoundError(f"Directory not found: {project_dir}")

    if extensions is None:
        patterns = [re.compile(r'\.(b|g)\d+$', re.IGNORECASE)]
    else:
        patterns = [re.compile(ext, re.IGNORECASE) for ext in extensions]

    modified_count = 0
    for fpath in project_dir.iterdir():
        if not fpath.is_file():
            continue
        if not any(p.search(fpath.name) for p in patterns):
            continue
        try:
            raw = fpath.read_bytes()
            if b'\r' in raw:
                fpath.write_bytes(raw.replace(b'\r\n', b'\n').replace(b'\r', b'\n'))
                modified_count += 1
                logger.debug(f"dos2unix: {fpath.name}")
        except (OSError, PermissionError) as exc:
            logger.warning(f"dos2unix skipped {fpath.name}: {exc}")

    logger.info(f"dos2unix: converted {modified_count} files in {project_dir}")
    return modified_count
discover_ras_versions staticmethod
Python
discover_ras_versions() -> Dict[str, Path]

Discover installed HEC-RAS versions by scanning Windows Registry, filesystem, and Wine prefixes (on Linux).

Resolution order: 1. Windows Registry (HKLM, WOW6432Node, HKCU) -- Windows only 2. Standard filesystem paths (Program Files) -- Windows only 3. Native Linux installs (/opt/hecras/, /opt/HEC-RAS/, ~/hecras/, or $RAS_COMMANDER_LINUX_RAS_ROOT) -- Linux only 4. Wine prefix paths (~/.wine, /opt/hecras-wine, etc.) -- Linux only

Returns:

Name Type Description
Dict[str, Path]

Dict[str, Path]: Mapping of version string -> Path to the executable.

Dict[str, Path]

On Windows/Wine this is Ras.exe; for native Linux installs there

Dict[str, Path]

is no Ras.exe, so it maps to the RasUnsteady solver binary (use

Dict[str, Path]

.parent as ras_exe_dir for RasCmdr.compute_plan_linux).

Example Dict[str, Path]

{"6.6": Path("C:/Program Files (x86)/HEC/HEC-RAS/6.6/Ras.exe")}

Source code in ras_commander/RasUtils.py
Python
@staticmethod
@log_call
def discover_ras_versions() -> Dict[str, Path]:
    """
    Discover installed HEC-RAS versions by scanning Windows Registry,
    filesystem, and Wine prefixes (on Linux).

    Resolution order:
    1. Windows Registry (HKLM, WOW6432Node, HKCU) -- Windows only
    2. Standard filesystem paths (Program Files) -- Windows only
    3. Native Linux installs (/opt/hecras/<ver>, /opt/HEC-RAS/<ver>,
       ~/hecras/<ver>, or $RAS_COMMANDER_LINUX_RAS_ROOT) -- Linux only
    4. Wine prefix paths (~/.wine, /opt/hecras-wine, etc.) -- Linux only

    Returns:
        Dict[str, Path]: Mapping of version string -> Path to the executable.
        On Windows/Wine this is ``Ras.exe``; for native Linux installs there
        is no Ras.exe, so it maps to the ``RasUnsteady`` solver binary (use
        ``.parent`` as ``ras_exe_dir`` for ``RasCmdr.compute_plan_linux``).
        Example: {"6.6": Path("C:/Program Files (x86)/HEC/HEC-RAS/6.6/Ras.exe")}
    """
    discovered: Dict[str, Path] = {}

    # Version folder names matching RasPrj.get_ras_exe()
    ras_version_folders = [
        "7.0", "6.7 Beta 5", "6.7 Beta 4", "6.6", "6.5", "6.4.1", "6.3.1", "6.3", "6.2",
        "6.1", "6.0", "5.0.7", "5.0.6", "5.0.5", "5.0.4", "5.0.3",
        "5.0.1", "5.0", "4.1.0", "4.0"
    ]

    version_aliases = {
        "4.1": "4.1.0", "41": "4.1.0", "410": "4.1.0",
        "40": "4.0", "50": "5.0", "501": "5.0.1", "503": "5.0.3",
        "504": "5.0.4", "505": "5.0.5", "506": "5.0.6", "507": "5.0.7",
        "60": "6.0", "61": "6.1", "62": "6.2", "63": "6.3",
        "631": "6.3.1", "6.4": "6.4.1", "64": "6.4.1", "641": "6.4.1",
        "65": "6.5", "66": "6.6", "6.7": "6.7 Beta 5", "67": "6.7 Beta 5",
        "70": "7.0",
    }

    def _normalize_version(raw: str, install_dir: Optional[Path] = None) -> str:
        v = str(raw).strip()
        if v in version_aliases:
            return version_aliases[v]
        if install_dir is not None:
            fn = install_dir.name.strip()
            if fn in version_aliases:
                return version_aliases[fn]
            if fn in ras_version_folders:
                return fn
        return v

    def _add(version: str, exe_path: Path, source: str) -> None:
        if version in discovered:
            logger.debug(f"Skipping duplicate HEC-RAS {version} from {source}")
            return
        discovered[version] = exe_path
        logger.info(f"Discovered HEC-RAS {version} at {exe_path} via {source}")

    def _scan_root(root_dir: Path, source_label: str) -> None:
        """Scan a directory containing versioned HEC-RAS subfolders."""
        if not root_dir.exists():
            return
        # Check known folder names first
        for folder_name in ras_version_folders:
            exe = root_dir / folder_name / "Ras.exe"
            if exe.is_file():
                v = _normalize_version(folder_name, exe.parent)
                _add(v, exe, source_label)
        # Glob for any other folders with Ras.exe
        try:
            for exe in sorted(root_dir.glob("*/Ras.exe")):
                v = _normalize_version(exe.parent.name, exe.parent)
                _add(v, exe, source_label)
        except OSError as exc:
            logger.warning(f"Filesystem scan failed for {root_dir}: {exc}")

    # --- Windows: Registry + Program Files ---
    if os.name == 'nt':
        # Registry scan
        try:
            import winreg

            def _is_no_more(exc: OSError) -> bool:
                return getattr(exc, "winerror", None) == 259

            hive_map = {
                "HKLM": winreg.HKEY_LOCAL_MACHINE,
                "HKCU": winreg.HKEY_CURRENT_USER,
            }
            registry_locations = [
                ("HKLM", r"SOFTWARE\HEC\HEC-RAS"),
                ("HKLM", r"SOFTWARE\WOW6432Node\HEC\HEC-RAS"),
                ("HKCU", r"SOFTWARE\HEC\HEC-RAS"),
            ]
            install_value_names = (
                "InstallDir", "InstallPath", "Install Path",
                "Path", "ExePath", "RasExePath",
            )

            for hive_name, subkey_path in registry_locations:
                try:
                    with winreg.OpenKey(hive_map[hive_name], subkey_path) as root_key:
                        idx = 0
                        while True:
                            try:
                                vk_name = winreg.EnumKey(root_key, idx)
                            except OSError as exc:
                                if _is_no_more(exc):
                                    break
                                break
                            idx += 1
                            try:
                                with winreg.OpenKey(root_key, vk_name) as vk:
                                    install_val = None
                                    for val_name in install_value_names:
                                        try:
                                            val, _ = winreg.QueryValueEx(vk, val_name)
                                            if val:
                                                install_val = str(val)
                                                break
                                        except (FileNotFoundError, OSError):
                                            continue
                                    if install_val:
                                        p = Path(os.path.expandvars(install_val.strip().strip('"')))
                                        if p.suffix.lower() != '.exe':
                                            p = p / "Ras.exe"
                                        if p.name.lower() == "ras.exe" and p.is_file():
                                            v = _normalize_version(vk_name, p.parent)
                                            _add(v, p, f"registry {hive_name}\\{subkey_path}")
                            except (FileNotFoundError, OSError):
                                continue
                except (FileNotFoundError, OSError):
                    continue
        except ImportError:
            logger.debug("winreg not available, skipping registry scan")

        # Filesystem scan (standard Windows paths)
        _scan_root(Path("C:/Program Files (x86)/HEC/HEC-RAS"), "filesystem (x86)")
        _scan_root(Path("C:/Program Files/HEC/HEC-RAS"), "filesystem")

    # --- Linux: native install scan ---
    else:
        # Native Linux HEC-RAS installs have no Ras.exe; the RasUnsteady
        # solver binary is the executable. Maps version -> RasUnsteady path
        # (callers for compute_plan_linux use ``.parent`` as ras_exe_dir).
        # Roots are configurable via $RAS_COMMANDER_LINUX_RAS_ROOT (CLB-883).
        linux_native_roots = [
            Path(os.path.expanduser("~/hecras")),
            Path("/opt/hecras"),
            Path("/opt/HEC-RAS"),
        ]
        env_root = os.environ.get("RAS_COMMANDER_LINUX_RAS_ROOT")
        if env_root:
            linux_native_roots.insert(0, Path(env_root))
        for _folder, _exe in RasUtils._scan_native_linux_ras(linux_native_roots).items():
            _add(_normalize_version(_folder, _exe.parent), _exe, "linux native")

        # --- Linux: Wine prefix scan ---
        wine_prefix_candidates = [
            Path(os.path.expanduser("~/.wine")),
            Path("/opt/hecras-wine"),
            Path(os.path.expanduser("~/hecras-wine")),
        ]
        # Also check WINEPREFIX env var
        env_prefix = os.environ.get("WINEPREFIX")
        if env_prefix:
            wine_prefix_candidates.insert(0, Path(env_prefix))

        for prefix in wine_prefix_candidates:
            drive_c = prefix / "drive_c"
            if not drive_c.exists():
                continue
            logger.debug(f"Scanning Wine prefix: {prefix}")
            # Standard HEC-RAS locations under drive_c
            _scan_root(drive_c / "Program Files (x86)" / "HEC" / "HEC-RAS", f"wine {prefix}")
            _scan_root(drive_c / "Program Files" / "HEC" / "HEC-RAS", f"wine {prefix}")
            _scan_root(drive_c / "HEC-RAS", f"wine {prefix}")

    logger.info(f"Discovered {len(discovered)} installed HEC-RAS version(s)")
    return discovered

Method Categories

File Operations
Method Description
create_directory(path) Ensure directory exists, create if needed
find_files_by_extension(folder, ext) Find all files with given extension
get_file_size(path) Get file size in bytes
get_file_modification_time(path) Get file modification timestamp
clone_file(src, dest) Copy file to new location
update_file(path, content) Write content to file
remove_with_retry(path, retries=3) Delete file with retry logic
check_file_access(path, mode) Verify file access permissions
Plan/Project Helpers
Method Description
normalize_ras_number(number) Convert "1", "01", "p01" to "01" format
get_plan_path(plan_number) Get full path to plan file
get_next_number(folder, prefix) Find next available plan/geom number
update_plan_file(path, key, value) Update single key in plan file
update_project_file(prj_path, updates) Batch update .prj file
Data Conversion
Method Description
convert_to_dataframe(path) Load CSV/Excel to DataFrame
save_to_excel(df, path, sheet) Save DataFrame to Excel
decode_byte_strings(data) Decode HDF byte strings to Python strings
consolidate_dataframe(df, group_by) Group and aggregate DataFrame rows
Statistical Analysis
Method Description
calculate_rmse(observed, predicted) Root Mean Square Error
calculate_percent_bias(obs, pred) Percent bias metric
calculate_error_metrics(obs, pred) All metrics (RMSE, NSE, PBIAS, R²)
Python
from ras_commander import RasUtils
import numpy as np

observed = np.array([100, 120, 140, 160, 180])
predicted = np.array([105, 125, 135, 165, 175])

metrics = RasUtils.calculate_error_metrics(observed, predicted)
print(f"RMSE: {metrics['rmse']:.2f}")
print(f"NSE: {metrics['nse']:.3f}")
print(f"PBIAS: {metrics['pbias']:.1f}%")
Spatial Operations
Method Description
perform_kdtree_query(points, query, max_dist) Find nearest points using KDTree
find_nearest_neighbors(points, max_dist) Find nearest neighbor for each point
find_nearest_value(array, target) Find value closest to target
horizontal_distance(p1, p2) Calculate 2D distance between points
Python
from ras_commander import RasUtils
import numpy as np

# Find nearest mesh cell for a list of query points
mesh_centroids = np.array([[0, 0], [10, 10], [20, 20]])
query_points = np.array([[5, 5], [15, 15]])

indices = RasUtils.perform_kdtree_query(
    mesh_centroids,
    query_points,
    max_distance=10.0
)
# Returns [1, 2] - nearest cell indices

RasExamples

RasExamples

RasExamples - Manage and load HEC-RAS example projects for testing and development

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL - Logs are written to both console and a rotating file handler. - The default log file is 'ras_commander.log' in the 'logs' directory. - The default log level is INFO.

To use logging in this module: 1. Use the @log_call decorator for automatic function call logging. 2. For additional logging, use logger.level calls (e.g., logger.info(), logger.debug()). 3. Obtain the logger using: logger = logging.getLogger(name)

Example

@log_call def my_function(): logger = logging.getLogger(name) logger.debug("Additional debug information") # Function logic here


All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasExamples: - get_example_projects() - list_categories() - list_projects() - extract_project() - is_project_extracted() - clean_projects_directory()

RasMap

RasMap

RasMap - Parses HEC-RAS mapper configuration files (.rasmap)

This module provides functionality to extract and organize information from HEC-RAS mapper configuration files, including paths to terrain, soil, and land cover data. It also includes functions to automate the post-processing of stored maps.

This module is part of the ras-commander library and uses a centralized logging configuration.

Logging Configuration: - The logging is set up in the logging_config.py file. - A @log_call decorator is available to automatically log function calls. - Log levels: DEBUG, INFO, WARNING, ERROR, CRITICAL

Classes:

Name Description
RasMap

Class for parsing and accessing HEC-RAS mapper configuration.


All of the methods in this class are static and are designed to be used without instantiation.

List of Functions in RasMap: - parse_rasmap(): Parse a .rasmap file and extract relevant information - get_rasmap_path(): Get the path to the .rasmap file based on the current project - initialize_rasmap_df(): Initialize the rasmap_df as part of project initialization - get_terrain_names(): Extracts terrain layer names from a given .rasmap file - list_map_layers(): List all map layers in the RASMapper configuration file - list_reference_map_layers(): List shapefile/GeoJSON reference map layers - list_basemap_layers(): List standard basemap layers registered in .rasmap - set_map_layer_visibility(): Toggle reference, basemap, and land-classification map layers - add_reference_map_layer(): Add a shapefile/GeoJSON reference map layer - add_basemap_layer(): Add a standard basemap layer - add_map_layer(): Add a map layer to the RASMapper configuration file - remove_map_layer(): Remove a map layer from the RASMapper configuration file - list_geometry_layers(): List top-level geometries and child geometry elements - list_geometry_features(): List HDF geometry features inside a layer - list_land_classification_polygons(): List sidecar classification polygon overrides - add_land_classification_polygon(): Add sidecar classification polygon override - update_land_classification_polygon(): Update sidecar classification polygon override - delete_land_classification_polygon(): Delete sidecar classification polygon override - set_geometry_layer_visibility(): Toggle child geometry elements such as mesh, XS, and structures - list_result_layers(): List RASMapper result plan and child layers - set_result_layer_visibility(): Toggle result plan and result child layers - get_current_view(): Read the RASMapper CurrentView bounds - set_current_view(): Write the RASMapper CurrentView bounds - set_terrain_layer_visibility(): Toggle terrain layers for RASMapper inspection - list_terrain_display_settings(): List persisted terrain display settings - get_terrain_display_settings(): Read persisted terrain display settings for one terrain - set_terrain_display_settings(): Write persisted terrain display settings - set_update_legend_with_view(): Enable viewport-updated legends on raster surface layers - zoom_to_geometry_layer(): Zoom CurrentView to HDF-derived geometry element extents - get_geometry_feature_bounds(): Get HDF-derived extents for a selected feature - open_rasmapper(): Launch standalone RasMapper.exe against the project .rasmap - capture_rasmapper_snapshot(): Capture a visible RASMapper window screenshot - create_spatial_review_package(): Build a RASMapper QA/QC evidence bundle - postprocess_stored_maps(): Automates the generation of stored floodplain map outputs via GUI automation - store_all_maps(): Headless stored map generation using RasStoreMapHelper.exe (no GUI required) - get_results_folder(): Get the folder path containing raster results for a specified plan - get_results_raster(): Get the .vrt file path for a specified plan and variable name - set_water_surface_render_mode(): Set the water surface rendering mode (horizontal or sloped) - get_water_surface_render_mode(): Get the current water surface rendering mode - add_terrain_layer(): Add terrain layer to RASMapper configuration - list_results_plans(): List all plan result layers in the RASMapper configuration - ensure_results_plan_layer(): Register a plan HDF as a RASMapper result layer - ensure_2d_encroachment_plan_layers(): Register editable 2D encroachment plan layers - list_results_map_layers(): List RASMapper result map layers - add_results_map_layer(): Add a RASMapper result map layer - list_calculated_layers(): List all calculated layers across all plan results - add_calculated_layer(): Add a calculated layer with .rasscript to the RASMapper configuration - remove_calculated_layer(): Remove a calculated layer from the RASMapper configuration - add_wse_comparison_layers(): Batch add WSE comparison layers for existing/proposed plan pairs

RASMapper Layer Discovery

The layer-list methods read the .rasmap file and return one dataframe row per layer. Use these when a workflow needs discoverable layer names and resolved paths instead of the compact, list-valued ras.rasmap_df project summary.

Python
from ras_commander import RasMap

terrain_layers = RasMap.list_terrain_layers(project_path)
terrain_display = RasMap.list_terrain_display_settings(project_path)
landcover_layers = RasMap.list_landcover_layers(project_path)
soils_layers = RasMap.list_soils_layers(project_path)
infiltration_layers = RasMap.list_infiltration_layers(project_path)

list_land_classification_layers() is the broad parser for RASMapper Type="LandCoverLayer" entries. The land-cover, soils, and infiltration methods are filtered convenience wrappers around that catalog.

Classification Polygon Overrides

RasMap.add_land_classification_polygon() authors the RAS Mapper Classification Polygons sidecar group used by land-cover, soils, and infiltration layers. It also upserts the affected Raster Map and Variables class rows when those datasets exist.

Python
from shapely.geometry import box
from ras_commander import RasMap

polygons = RasMap.add_land_classification_polygon(
    "Land Classification/LandCover.hdf",
    box(2083000, 370500, 2083500, 371000),
    class_name="Parking Lot",
    class_id=99,
    variable_values={
        "mannings_n": 0.105,
        "percent_impervious": 95.0,
    },
)

Use list_land_classification_polygons(), update_land_classification_polygon(), and delete_land_classification_polygon() for extraction and maintenance. After editing sidecar polygons for an already-associated geometry, rerun preprocessing/property-table workflows so compiled geometry HDFs consume the new override.

Terrain Display Settings

RasMap.list_terrain_display_settings(), RasMap.get_terrain_display_settings(), and RasMap.set_terrain_display_settings() expose RASMapper terrain display controls persisted in .rasmap XML. They cover hillshade display and Z factor, contour display and interval, and terrain stitch-edge plot options such as Plot stitch TIN edges.

Python
RasMap.set_terrain_display_settings(
    project_path,
    terrain_name="TerrainWithChannel",
    hillshade_enabled=True,
    hillshade_z_factor=2.0,
    contour_enabled=True,
    contour_interval=5.0,
    stitch_edges_enabled=True,
)

CLB-272 owns these terrain display toggles. CLB-253 remains the separate terrain-modification gap for generating terrain changes such as channel modifications and interpolated cross-section terrain products.

Geometry HDF Layer Associations

RasMap.get_hdf_geometry_association() reads /Geometry association attributes from geometry HDFs and plan/result HDFs without mutation. RasMap.associate_geometry_layers() writes the geometry HDF attributes through Python-native h5py.

Python
association = RasMap.get_hdf_geometry_association("MyModel.g01.hdf")
print(association["terrain_hdf_path"])

RasMap.associate_geometry_layers(
    project_path,
    "MyModel.g01.hdf",
    terrain_hdf_path="Terrain/ExistingTerrain.hdf",
    landcover_hdf_path="Land Classification/LandCover.hdf",
    infiltration_hdf_path="Land Classification/Infiltration.hdf",
)

Compiled HDF only

associate_geometry_layers() updates an existing .g##.hdf. It does not compile plain-text .g## geometry into HDF or create missing geometry datasets.

RasProcess

RasProcess

RasProcess - Wrapper for RasProcess.exe CLI automation

This module provides functionality to automate HEC-RAS mapping operations through the undocumented RasProcess.exe command-line interface. It enables headless generation of stored maps, preprocessing, and other RASMapper operations.

Key Features: - Generate stored maps (WSE, Depth, Velocity, etc.) without GUI - Support for Max/Min profiles and specific timesteps - Automatic .rasmap modification for stored map configuration - Georeferencing fix for StoreMap command bug - Linux support via Wine (headless, no display required)

Classes:

Name Description
RasProcess

Static class for RasProcess.exe CLI operations

Note

RasProcess.exe is an undocumented CLI tool bundled with HEC-RAS that exposes RASMapper automation functionality. See rasmapper_docs/16_rasprocess_cli_reference.md for complete documentation.

Linux/Wine Support: On Linux, RasProcess.exe runs under Wine with .NET Framework 4.8. Requirements: - Wine 8.0+ (64-bit prefix) - winetricks: dotnet48, gdiplus, corefonts - HEC-RAS DLLs copied into Wine prefix (see setup_wine_environment())

Text Only
The module auto-detects Linux and wraps commands with Wine automatically.
No code changes needed for users — just install the Wine environment.

RasProcess Details

RasProcess.exe CLI

RasProcess.exe is an undocumented command-line interface bundled with HEC-RAS that enables headless automation of RASMapper operations. The RasProcess class wraps this CLI for programmatic access.

Geometry Association Validator

validate_geometry_association_cli() runs the native RasProcess.exe SetGeometryAssociation command and compares the resulting /Geometry attributes against ras-commander's expected HEC-RAS-style attributes.

Python
from ras_commander import RasProcess

result = RasProcess.validate_geometry_association_cli(
    "MyModel.g01.hdf",
    terrain_hdf_path="Terrain/ExistingTerrain.hdf",
    landcover_hdf_path="Land Classification/LandCover.hdf",
    ras_version="7.0",
)

print(result["passed"])
print(result["return_code"])
print(result["mismatches"])

The returned dictionary includes the native command arguments, return code, stdout/stderr, before/after attributes, expected attributes, mismatch list, and passed.

In-place mutation

This method mutates the supplied HDF. It exists as a native reference validator for disposable copies or intentional validation runs. Normal workflows should call RasMap.associate_geometry_layers().

Supported Map Types
Parameter XML Type Display Name Default
wse elevation WSE True
depth depth Depth True
velocity velocity Velocity True
froude froude Froude False
shear_stress Shear Shear Stress False
depth_x_velocity depth and velocity D * V False
depth_x_velocity_sq depth and velocity squared D * V² False
Profile Selection

The profile parameter accepts:

  • "Max" - Maximum values across all timesteps (default)
  • "Min" - Minimum values across all timesteps
  • Specific timestamp string from get_plan_timestamps() (e.g., "10SEP2018 02:30:00")
Basic Usage
Python
from ras_commander import init_ras_project, RasProcess

# Initialize project
init_ras_project("path/to/project", "7.0")

# Generate default maps (WSE, Depth, Velocity)
results = RasProcess.store_maps(
    plan_number="01",
    profile="Max",
    wse=True,
    depth=True,
    velocity=True
)

# Results is a dict: {'wse': [Path(...)], 'depth': [...], ...}
for map_type, files in results.items():
    print(f"{map_type}: {len(files)} file(s)")
Batch Processing
Python
# Generate maps for ALL plans with HDF results
all_results = RasProcess.store_all_maps(
    profile="Max",
    wse=True,
    depth=True,
    velocity=True,
    froude=True
)

for plan_num, files in all_results.items():
    print(f"Plan {plan_num}: {sum(len(f) for f in files.values())} files")
Timestep Maps
Python
# Get available timestamps
timestamps = RasProcess.get_plan_timestamps("01")
print(f"Available: {timestamps[:3]}...")  # ['10SEP2018 00:00:00', ...]

# Generate map for specific time
results = RasProcess.store_maps(
    plan_number="01",
    profile=timestamps[10],  # 10th timestep
    wse=True
)

Georeferencing Fix

RasProcess.exe has a known bug where generated TIFs may lack proper CRS information. Set fix_georef=True (default) to automatically apply the CRS from the project's projection file using rasterio.

Custom Output Path

By default, RasProcess.exe writes to <project_folder>/<Plan ShortID>/. Use the output_path parameter to redirect output to any directory:

Python
# Output to custom location
results = RasProcess.store_maps(
    plan_number="01",
    output_path="C:/Exports/FloodMaps",
    depth=True, wse=True
)
# Files moved to C:/Exports/FloodMaps/ after generation

How output_path Works

The default StoreAllMaps command hardcodes output to <Plan ShortID>/. When output_path is specified, individual StoreMap XML commands are used instead, with an absolute OutputBaseFilename that bypasses the ShortID prefix via C#'s Path.Combine() behavior. Relative paths are resolved against the project folder.

CLB Engineering Corporation  ·  LLM Forward Engineering
RAS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.