Skip to content

Steady Flow Authoring

Python
from pathlib import Path

import pandas as pd
from IPython.display import display

import ras_commander
from ras_commander import (
    HdfResultsPlan,
    RasCmdr,
    RasExamples,
    RasPlan,
    RasPrj,
    RasSteady,
    init_ras_project,
)

print(f"ras-commander: {ras_commander.__version__}")
print(f"Loaded from: {ras_commander.__file__}")

Development Mode

When running this notebook from a source checkout instead of an installed package, start Jupyter from the repository root or add the repository root to PYTHONPATH before launching Jupyter. Generated example projects and run outputs are written under working/.

Steady Flow File Authoring

This notebook validates RasSteady.create_flow_file() and RasSteady.update_flow_file() against a real HEC-RAS steady-flow model. Each boundary-condition type is written to an extracted copy of the Beaver Creek example, computed with HEC-RAS, and checked through plan HDF compute messages.

Python
def find_repo_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / "pyproject.toml").exists() and (candidate / "ras_commander").exists():
            return candidate
    return start


REPO_ROOT = find_repo_root(Path.cwd())
WORK_ROOT = REPO_ROOT / "working" / "steady_flow_authoring"
PROJECT_NAME = "Example 2 - Beaver Creek"
PLAN_NUMBER = "01"
RAS_VERSION = "7.0"

WORK_ROOT.mkdir(parents=True, exist_ok=True)
print(f"Working root: {WORK_ROOT}")
Python
BOUNDARY_VARIANTS = [
    {
        "name": "known_ws",
        "label": "Known WS",
        "boundary": RasSteady.known_water_surface(210.5),
    },
    {
        "name": "normal_depth",
        "label": "Normal Depth",
        "boundary": RasSteady.normal_depth(0.0012),
    },
    {
        "name": "critical_depth",
        "label": "Critical Depth",
        "boundary": RasSteady.critical_depth(),
    },
    {
        "name": "rating_curve",
        "label": "Rating Curve",
        "boundary": RasSteady.rating_curve(
            [
                {"flow": 4000.0, "stage": 208.0},
                {"flow": 9000.0, "stage": 210.0},
                {"flow": 16000.0, "stage": 212.0},
            ]
        ),
    },
]


def boundary_problem_lines(messages: str) -> list[str]:
    problem_words = ("error", "warning", "fatal", "failed", "invalid")
    return [
        line.strip()
        for line in messages.splitlines()
        if "bound" in line.lower()
        and any(word in line.lower() for word in problem_words)
    ]
Python
def author_and_compute_boundary(variant: dict) -> dict:
    project_path = RasExamples.extract_project(
        PROJECT_NAME,
        output_path=WORK_ROOT / variant["name"],
    )

    ras_project = RasPrj()
    init_ras_project(project_path, RAS_VERSION, ras_object=ras_project)
    flow_path = RasPlan.get_flow_path("01", ras_object=ras_project)
    assert flow_path is not None, "Expected Beaver Creek steady flow file f01"

    RasSteady.create_flow_file(
        flow_path,
        flow_title=f"CLB-300 {variant['label']} authored flow",
        profile_names=[variant["label"]],
        flow_changes=[
            {
                "river": "Beaver Creek",
                "reach": "Kentwood",
                "station": "5.99",
                "flows": [10000],
            }
        ],
        boundaries=[
            RasSteady.boundary(
                "Beaver Creek",
                "Kentwood",
                profile=1,
                downstream=variant["boundary"],
            )
        ],
        program_version="7.00",
    )
    RasSteady.update_flow_file(
        flow_path,
        flow_title=f"CLB-300 {variant['label']} authored and updated flow",
    )

    authored = RasSteady.read_flow_file(flow_path)
    assert authored["profile_names"] == [variant["label"]]
    assert len(authored["flow_changes"][0]["flows"]) == 1
    assert authored["boundaries"][0]["downstream"]["type_name"] == variant["label"]

    result = RasCmdr.compute_plan(
        PLAN_NUMBER,
        ras_object=ras_project,
        num_cores=1,
        verify=False,
        force_rerun=True,
        use_optimal_hdf_settings=True,
    )
    assert result, f"HEC-RAS compute failed for {variant['label']}"

    plan_hdf = project_path / f"{ras_project.project_name}.p{PLAN_NUMBER}.hdf"
    assert plan_hdf.exists(), f"Expected plan HDF at {plan_hdf}"
    messages = HdfResultsPlan.get_compute_messages_hdf_only(plan_hdf)
    assert "Complete Process" in messages
    assert "Finished Steady Flow Simulation" in messages

    boundary_problems = boundary_problem_lines(messages)
    assert not boundary_problems, "\n".join(boundary_problems)

    return {
        "boundary": variant["label"],
        "project": str(project_path),
        "flow_file": str(flow_path),
        "plan_hdf": str(plan_hdf),
        "message_chars": len(messages),
        "compute_success": bool(result),
    }
Python
results = [author_and_compute_boundary(variant) for variant in BOUNDARY_VARIANTS]
summary = pd.DataFrame(results)
display(summary)

assert summary["compute_success"].all()
assert set(summary["boundary"]) == {
    "Known WS",
    "Normal Depth",
    "Critical Depth",
    "Rating Curve",
}
CLB Engineering Corporation  ·  LLM Forward Engineering
RAS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.