Skip to content

Steady Flow Authoring

Python
from pathlib import Path

import pandas as pd
from IPython.display import display

import ras_commander
from ras_commander import (
    HdfResultsPlan,
    RasCmdr,
    RasExamples,
    RasPlan,
    RasPrj,
    RasSteady,
    init_ras_project,
)

print(f"ras-commander: {ras_commander.__version__}")
print(f"Loaded from: {ras_commander.__file__}")
Text Only
ras-commander: 0.98.0
Loaded from: <workspace>\ras_commander\__init__.py

Development Mode

When running this notebook from a source checkout instead of an installed package, start Jupyter from the repository root or add the repository root to PYTHONPATH before launching Jupyter. Generated example projects and run outputs are written under working/.

Steady Flow File Authoring

This notebook validates RasSteady.create_flow_file() and RasSteady.update_flow_file() against a real HEC-RAS steady-flow model. Each boundary-condition type is written to an extracted copy of the Beaver Creek example, computed with HEC-RAS, and checked through plan HDF compute messages.

Python
def find_repo_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / "pyproject.toml").exists() and (candidate / "ras_commander").exists():
            return candidate
    return start


REPO_ROOT = find_repo_root(Path.cwd())
WORK_ROOT = REPO_ROOT / "working" / "steady_flow_authoring"
PROJECT_NAME = "Example 2 - Beaver Creek"
PLAN_NUMBER = "01"
RAS_VERSION = "7.0"

WORK_ROOT.mkdir(parents=True, exist_ok=True)
print(f"Working root: {WORK_ROOT}")
Text Only
Working root: <workspace>\working\steady_flow_authoring
Python
BOUNDARY_VARIANTS = [
    {
        "name": "known_ws",
        "label": "Known WS",
        "boundary": RasSteady.known_water_surface(210.5),
    },
    {
        "name": "normal_depth",
        "label": "Normal Depth",
        "boundary": RasSteady.normal_depth(0.0012),
    },
    {
        "name": "critical_depth",
        "label": "Critical Depth",
        "boundary": RasSteady.critical_depth(),
    },
    {
        "name": "rating_curve",
        "label": "Rating Curve",
        "boundary": RasSteady.rating_curve(
            [
                {"flow": 4000.0, "stage": 208.0},
                {"flow": 9000.0, "stage": 210.0},
                {"flow": 16000.0, "stage": 212.0},
            ]
        ),
    },
]


def boundary_problem_lines(messages: str) -> list[str]:
    problem_words = ("error", "warning", "fatal", "failed", "invalid")
    return [
        line.strip()
        for line in messages.splitlines()
        if "bound" in line.lower()
        and any(word in line.lower() for word in problem_words)
    ]
Python
def author_and_compute_boundary(variant: dict) -> dict:
    project_path = RasExamples.extract_project(
        PROJECT_NAME,
        output_path=WORK_ROOT / variant["name"],
    )

    ras_project = RasPrj()
    init_ras_project(project_path, RAS_VERSION, ras_object=ras_project)
    flow_path = RasPlan.get_flow_path("01", ras_object=ras_project)
    assert flow_path is not None, "Expected Beaver Creek steady flow file f01"

    RasSteady.create_flow_file(
        flow_path,
        flow_title=f"CLB-300 {variant['label']} authored flow",
        profile_names=[variant["label"]],
        flow_changes=[
            {
                "river": "Beaver Creek",
                "reach": "Kentwood",
                "station": "5.99",
                "flows": [10000],
            }
        ],
        boundaries=[
            RasSteady.boundary(
                "Beaver Creek",
                "Kentwood",
                profile=1,
                downstream=variant["boundary"],
            )
        ],
        program_version="7.00",
    )
    RasSteady.update_flow_file(
        flow_path,
        flow_title=f"CLB-300 {variant['label']} authored and updated flow",
    )

    authored = RasSteady.read_flow_file(flow_path)
    assert authored["profile_names"] == [variant["label"]]
    assert len(authored["flow_changes"][0]["flows"]) == 1
    assert authored["boundaries"][0]["downstream"]["type_name"] == variant["label"]

    result = RasCmdr.compute_plan(
        PLAN_NUMBER,
        ras_object=ras_project,
        num_cores=1,
        verify=False,
        force_rerun=True,
        use_optimal_hdf_settings=True,
    )
    assert result, f"HEC-RAS compute failed for {variant['label']}"

    plan_hdf = project_path / f"{ras_project.project_name}.p{PLAN_NUMBER}.hdf"
    assert plan_hdf.exists(), f"Expected plan HDF at {plan_hdf}"
    messages = HdfResultsPlan.get_compute_messages_hdf_only(plan_hdf)
    assert "Complete Process" in messages
    assert "Finished Steady Flow Simulation" in messages

    boundary_problems = boundary_problem_lines(messages)
    assert not boundary_problems, "\n".join(boundary_problems)

    return {
        "boundary": variant["label"],
        "project": str(project_path),
        "flow_file": str(flow_path),
        "plan_hdf": str(plan_hdf),
        "message_chars": len(messages),
        "compute_success": bool(result),
    }
Python
results = [author_and_compute_boundary(variant) for variant in BOUNDARY_VARIANTS]
summary = pd.DataFrame(results)
display(summary)

assert summary["compute_success"].all()
assert set(summary["boundary"]) == {
    "Known WS",
    "Normal Depth",
    "Critical Depth",
    "Rating Curve",
}
boundary project flow_file plan_hdf message_chars compute_success
0 Known WS 719 True
1 Normal Depth 719 True
2 Critical Depth 719 True
3 Rating Curve 719 True