Skip to content

Using eBFE Models: Spring River Validation

This notebook validates the organized Spring River eBFE delivery as a results-ready HEC-RAS 6.1 example. It uses the shared delivery workspace, reviews the saved audit evidence, optionally executes a fresh geometry-preprocessor validation run, confirms the local compatibility assets that distinguish Spring River from Spring Creek, and verifies that all seven plan result HDFs resolve inside the organized RAS project folder.

What This Notebook Proves

  1. RasEbfeModels.organize_model("spring-river") resolves to the shared organized delivery for HUC 11010010.
  2. The organized study remains clearly distinct from spring-creek / SpringCreek_12040102 in code, folder names, and notebook messaging.
  3. The organized RAS project has local projection, terrain, Land Cover, legacy Land Classification, DSS, and RASMapper assets.
  4. The optional notebook validation run can execute the HEC-RAS 6.1 geometry preprocessor for plan 01 / geometry 01 and archives fresh review evidence under the configured artifact root.
  5. All seven hydraulic result HDFs are present and resolve from ras.plan_df inside the organized project folder.
Python
from pathlib import Path
import json
import logging
import os
import sys

import h5py
import pandas as pd
from IPython.display import display

def find_repo_root(start: Path | None = None) -> Path:
    search_start = (start or Path.cwd()).resolve()
    for candidate in [search_start, *search_start.parents]:
        if (candidate / "scripts" / "ebfe_geometry_preprocessor_batch.py").exists():
            return candidate
    raise FileNotFoundError("Could not locate repository root from current working directory.")

try:
    from ras_commander import RasPrj, RasUtils, init_ras_project
    from ras_commander.sources.federal import RasEbfeModels
except ImportError:
    repo_root = find_repo_root()
    sys.path.insert(0, str(repo_root))
    from ras_commander import RasPrj, RasUtils, init_ras_project
    from ras_commander.sources.federal import RasEbfeModels

logging.getLogger().setLevel(logging.WARNING)
logging.getLogger("ras_commander").setLevel(logging.WARNING)
pd.options.display.max_colwidth = 160

Configure the eBFE Workspace

Python
EBFE_WORKSPACE = Path(os.environ.get("RAS_COMMANDER_EBFE_ROOT", "H:/Testing/eBFE Model Organization"))
DOWNLOAD_ROOT = EBFE_WORKSPACE / "Downloads"
ORGANIZED_ROOT = EBFE_WORKSPACE / "Organized"
VALIDATION_ROOT = EBFE_WORKSPACE / "Validation" / "ebfe_delivery"
REVIEW_ROOT = Path(os.environ.get("RAS_COMMANDER_REVIEW_ROOT", str(VALIDATION_ROOT / "notebook_review")))
NOTEBOOK_ARTIFACT_ROOT = REVIEW_ROOT / "spring_river_notebook"
NOTEBOOK_ARTIFACT_ROOT.mkdir(parents=True, exist_ok=True)

organized_folder = ORGANIZED_ROOT / "SpringRiver_11010010"
if not organized_folder.exists():
    organized_folder = RasEbfeModels.organize_model(
        "spring-river",
        download_root=DOWNLOAD_ROOT,
        output_root=ORGANIZED_ROOT,
    )

ras_model_root = organized_folder / "RAS Model"
print(f"Spring River organized folder: {organized_folder}")
print(f"RAS Model root exists: {ras_model_root.exists()}")
print(f"Validation root: {VALIDATION_ROOT}")
print(f"Review artifact root: {NOTEBOOK_ARTIFACT_ROOT}")

Confirm Model Identity and Naming

Python
projects = RasUtils.find_valid_ras_folders(
    ras_model_root,
    max_depth=10,
    return_project_info=True,
)
project_df = pd.DataFrame(projects)
project_folder = Path(project_df.iloc[0]["folder"])

model_identity = pd.DataFrame(
    [
        {
            "model_key": "spring-river",
            "huc8": "11010010",
            "organized_folder": organized_folder.name,
            "project_name": project_df.iloc[0]["project_name"],
            "ras_version": "6.1",
            "distinguishes_from": "spring-creek / SpringCreek_12040102",
        }
    ]
)
model_identity

Discover and Initialize the RAS Project

Python
display(project_df[["project_name", "folder", "prj_file"]])

ras = RasPrj()
init_ras_project(
    project_folder,
    "6.1",
    ras_object=ras,
    load_results_summary=False,
)

plan_columns = [
    column for column in ["plan_number", "flow_type", "Geom File", "Flow File", "HDF_Results_Path"]
    if column in ras.plan_df.columns
]
display(ras.plan_df[plan_columns].sort_values("plan_number"))

Confirm Local Delivery Assets and Compatibility Copies

Python
asset_rows = []
for label, pattern in {
    "projection": "Projection/*.prj",
    "terrain_hdf": "Terrain/**/*.hdf",
    "land_cover_tif": "Land Cover/**/*.tif",
    "land_cover_hdf": "Land Cover/**/*.hdf",
    "legacy_land_classification_tif": "Land Classification/**/*.tif",
    "legacy_land_classification_hdf": "Land Classification/**/*.hdf",
    "dss": "DSS Inputs/*.dss",
    "rasmap": "*.rasmap",
}.items():
    for path in sorted(project_folder.glob(pattern)):
        asset_rows.append(
            {
                "asset_type": label,
                "file": path.name,
                "relative_path": str(path.relative_to(project_folder)),
                "size_mb": round(path.stat().st_size / 1_000_000, 3),
            }
        )

asset_df = pd.DataFrame(asset_rows).sort_values(["asset_type", "relative_path"]).reset_index(drop=True)

def read_land_cover_attr(path: Path) -> str:
    with h5py.File(path, "r") as hdf:
        attr = hdf["Geometry"].attrs["Land Cover Filename"]
        return attr.decode("utf-8") if isinstance(attr, bytes) else str(attr)

land_cover_reference = read_land_cover_attr(project_folder / "Spring_BLE.g01.hdf")
compatibility_df = pd.DataFrame(
    [
        {
            "geometry_hdf": "Spring_BLE.g01.hdf",
            "land_cover_reference": land_cover_reference,
            "legacy_copy_exists": (project_folder / "Land Classification" / "LandCover (1).hdf").exists(),
            "delivery_land_cover_exists": (project_folder / "Land Cover" / "LandCover (1).hdf").exists(),
        }
    ]
)

display(asset_df)
compatibility_df

Review Saved Audit Evidence

Python
def find_latest_json(root: Path, patterns: list[str], text_patterns: list[str] | None = None) -> Path | None:
    if not root.exists():
        return None
    candidates = []
    for pattern in patterns:
        candidates.extend(root.rglob(pattern))
    candidates = sorted({path for path in candidates if path.is_file()}, key=lambda path: path.stat().st_mtime)
    if not text_patterns:
        return candidates[-1] if candidates else None
    for path in reversed(candidates):
        try:
            text = path.read_text(encoding="utf-8", errors="ignore")
        except OSError:
            continue
        if any(pattern.lower() in text.lower() for pattern in text_patterns):
            return path
    return None


def size_gb(path: Path) -> float:
    return round(path.stat().st_size / 1_000_000_000, 3)


def classify_plan_hdf(path: Path) -> dict[str, object]:
    with h5py.File(path, "r") as hdf:
        top_groups = sorted(hdf.keys())
        has_geometry = "Geometry" in hdf
        has_plan_data = "Plan Data" in hdf
        has_event_conditions = "Event Conditions" in hdf
        has_summary_only = list(hdf.keys()) == ["Results"] and "Summary" in hdf["Results"]
        if has_geometry and has_plan_data:
            classification = "hydraulic results"
        elif has_summary_only:
            classification = "compute messages only"
        else:
            classification = "unknown HDF layout"
    return {
        "file": path.name,
        "size_gb": size_gb(path),
        "classification": classification,
        "has_geometry": has_geometry,
        "has_plan_data": has_plan_data,
        "has_event_conditions": has_event_conditions,
        "top_groups": ", ".join(top_groups),
    }


def existing_path_count(values) -> int:
    return sum(bool(value) and Path(str(value)).exists() for value in values)


audit_path = find_latest_json(
    VALIDATION_ROOT,
    ["e2e_delivery_audit_*.json"],
    ["SpringRiver_11010010", "Spring River", "11010010"],
)
print(f"Audit report: {audit_path}")

audit_payload = json.loads(audit_path.read_text(encoding="utf-8")) if audit_path else []
spring_river_record = audit_payload[0] if audit_payload else {}
project_record = spring_river_record.get("projects", [{}])[0]

audit_summary = pd.DataFrame(
    [
        {
            "status": spring_river_record.get("status"),
            "hms_status": spring_river_record.get("hms_status"),
            "project_count": spring_river_record.get("project_count"),
            "audit_issues": len(spring_river_record.get("issues", [])),
            "project_crs": project_record.get("project_crs"),
            "terrain_layers": len(project_record.get("terrain_layers", [])),
            "land_cover_layers": len(project_record.get("land_cover_layers", [])),
            "dss_reference_count": project_record.get("dss_reference_count"),
            "output_hdf_count": project_record.get("output_hdf_count"),
            "outputs_inside_project": project_record.get("outputs_inside_project"),
            "crs_mismatches": len(project_record.get("terrain_crs_mismatches", []))
            + len(project_record.get("land_cover_crs_mismatches", [])),
        }
    ]
)
audit_summary

Review Geometry-Preprocessor Evidence

Python
preprocessor_root = VALIDATION_ROOT / "preprocessor_validation"
preprocessor_path = find_latest_json(
    preprocessor_root / "spring_river_ras61_legacy_landclassification",
    ["geometry_preprocessor_validation_*.json"],
    ["Spring_BLE", "spring-river", "SpringRiver_11010010"],
)
if preprocessor_path is None:
    preprocessor_path = find_latest_json(
        preprocessor_root,
        ["geometry_preprocessor_validation_*.json"],
        ["Spring_BLE", "spring-river", "SpringRiver_11010010"],
    )
archived_preprocessor_report = preprocessor_path
print(f"Archived preprocessor report: {archived_preprocessor_report}")

preprocessor_payload = json.loads(preprocessor_path.read_text(encoding="utf-8")) if preprocessor_path else {"records": []}
records = preprocessor_payload.get("records", [])
archived_preprocessor_hdf = None
if archived_preprocessor_report is not None:
    matches = sorted(archived_preprocessor_report.parent.glob("Spring_BLE.p01.preprocessor_*.hdf"))
    archived_preprocessor_hdf = matches[-1] if matches else None

preprocessor_summary = pd.DataFrame(
    [
        {
            "generated_at": preprocessor_payload.get("generated_at"),
            "record_count": len(records),
            "status_counts": json.dumps(preprocessor_payload.get("status_counts", {}), sort_keys=True),
            "archived_preprocessor_hdf": str(archived_preprocessor_hdf) if archived_preprocessor_hdf else None,
        }
    ]
)

plan_rows = []
for record in records:
    for plan in record.get("plans", []):
        plan_rows.append(
            {
                "study": record.get("study"),
                "project": record.get("project_name"),
                "status": record.get("status"),
                "plan": plan.get("plan_number"),
                "geometry": plan.get("geometry_number"),
                "flow_type": plan.get("flow_type"),
                "elapsed_seconds": round(plan.get("elapsed_seconds", 0), 1),
                "errors": plan.get("error_count"),
                "warnings": plan.get("warning_count"),
                "return_code": plan.get("return_code"),
            }
        )

display(preprocessor_summary)
pd.DataFrame(plan_rows)

Verify Result-HDF Availability

Python
organized_plan_hdfs = sorted(project_folder.glob("*.p??.hdf"))
hdf_summary = pd.DataFrame(classify_plan_hdf(path) for path in organized_plan_hdfs)
print(
    f"{existing_path_count(ras.plan_df['HDF_Results_Path'])} of {len(ras.plan_df)} plan dataframe HDF paths exist locally"
)
display(hdf_summary)

result_path_df = ras.plan_df[["plan_number", "HDF_Results_Path"]].copy()
result_path_df["exists"] = result_path_df["HDF_Results_Path"].apply(
    lambda value: bool(value) and Path(str(value)).exists()
)
result_path_df["inside_project"] = result_path_df["HDF_Results_Path"].apply(
    lambda value: bool(value)
    and Path(str(value)).exists()
    and project_folder in Path(str(value)).parents
)
result_path_df.sort_values("plan_number")

Optional Preprocessor Re-Run Command

Python
RUN_GEOMETRY_PREPROCESSOR = os.environ.get("RAS_COMMANDER_RUN_GEOMETRY_PREPROCESSOR", "0").strip().lower() not in {"0", "false", "no"}
GEOMETRY_PREPROCESSOR_MAX_WAIT = int(os.environ.get("RAS_COMMANDER_GEOMETRY_PREPROCESSOR_MAX_WAIT", "7200"))

repo_root = find_repo_root()
batch_script = repo_root / "scripts" / "ebfe_geometry_preprocessor_batch.py"
rerun_output_dir = NOTEBOOK_ARTIFACT_ROOT / "preprocessor_validation"
rerun_output_dir.mkdir(parents=True, exist_ok=True)
plan_01_hdf = project_folder / "Spring_BLE.p01.hdf"
temporary_full_result_backup = rerun_output_dir / "Spring_BLE.p01.full_results_backup.hdf"
validation_preprocessor_path = archived_preprocessor_report
validation_preprocessor_hdf = archived_preprocessor_hdf
validation_preprocessor_payload = preprocessor_payload
validation_preprocessor_status = "not_run"
validation_preprocessor_return_code = None
validation_preprocessor_error = None
cmd = [
    sys.executable,
    str(batch_script),
    "--root",
    str(ras_model_root),
    "--ras-version",
    "6.1",
    "--plan",
    "01",
    "--max-wait",
    str(GEOMETRY_PREPROCESSOR_MAX_WAIT),
    "--output-dir",
    str(rerun_output_dir),
]
display_cmd = [
    "python",
    str(batch_script.relative_to(repo_root)),
    *cmd[2:],
]

if RUN_GEOMETRY_PREPROCESSOR:
    import shutil
    import subprocess

    if not batch_script.exists():
        validation_preprocessor_status = "skipped"
        validation_preprocessor_error = f"Batch script not found at {batch_script}"
        print(f"SKIP: {validation_preprocessor_error}")
    elif not plan_01_hdf.exists():
        validation_preprocessor_status = "skipped"
        validation_preprocessor_error = f"Plan 01 result HDF not found before preprocessor run: {plan_01_hdf}"
        print(f"SKIP: {validation_preprocessor_error}")
    else:
        shutil.copy2(plan_01_hdf, temporary_full_result_backup)
        print(f"Backed up full-result plan HDF: {temporary_full_result_backup}")
        print("Running Spring River geometry preprocessor validation:")
        print(" ".join(f'"{part}"' if " " in part else part for part in display_cmd))

        try:
            completed = subprocess.run(
                cmd,
                check=False,
                timeout=GEOMETRY_PREPROCESSOR_MAX_WAIT + 300,
            )
            validation_preprocessor_return_code = completed.returncode

            rerun_preprocessor_path = find_latest_json(
                rerun_output_dir,
                ["geometry_preprocessor_validation_*.json"],
                ["Spring_BLE", "spring-river", "SpringRiver_11010010"],
            )
            if rerun_preprocessor_path is None:
                validation_preprocessor_status = "failed"
                validation_preprocessor_error = f"No rerun preprocessor report found in {rerun_output_dir}"
            else:
                validation_preprocessor_payload = json.loads(rerun_preprocessor_path.read_text(encoding="utf-8"))
                validation_records = validation_preprocessor_payload.get("records", [])
                validation_project = validation_records[0] if validation_records else {}
                validation_plan = validation_project.get("plans", [{}])[0]
                validation_generated_at = validation_preprocessor_payload.get("generated_at")
                validation_preprocessor_status = validation_project.get("status") or "unknown"
                validation_preprocessor_error = validation_plan.get("error")
                validation_preprocessor_path = rerun_preprocessor_path
                print(f"Fresh preprocessor report: {validation_preprocessor_path}")

                if validation_plan.get("success"):
                    validation_preprocessor_hdf = rerun_output_dir / f"Spring_BLE.p01.preprocessor_{validation_generated_at}.hdf"
                    shutil.copy2(plan_01_hdf, validation_preprocessor_hdf)
                    print(f"Archived fresh preprocessor HDF: {validation_preprocessor_hdf}")
                else:
                    validation_preprocessor_hdf = None
                    print(f"Fresh preprocessor status: {validation_preprocessor_status}")
                    if validation_preprocessor_error:
                        print(f"Fresh preprocessor detail: {validation_preprocessor_error}")

                validation_summary = pd.DataFrame(
                    [
                        {
                            "generated_at": validation_generated_at,
                            "status": validation_preprocessor_status,
                            "return_code": validation_preprocessor_return_code,
                            "plan": validation_plan.get("plan_number"),
                            "geometry": validation_plan.get("geometry_number"),
                            "elapsed_seconds": round(validation_plan.get("elapsed_seconds", 0), 1),
                            "errors": validation_plan.get("error_count"),
                            "warnings": validation_plan.get("warning_count"),
                            "preprocessor_hdf": str(validation_preprocessor_hdf) if validation_preprocessor_hdf else None,
                        }
                    ]
                )
                display(validation_summary)

            if validation_preprocessor_return_code not in (0, None):
                print(f"Geometry preprocessor batch exited with return code {validation_preprocessor_return_code}; see report above.")
        except subprocess.TimeoutExpired:
            validation_preprocessor_status = "timed_out"
            validation_preprocessor_error = f"Geometry preprocessor timed out after {GEOMETRY_PREPROCESSOR_MAX_WAIT + 300} seconds."
            print(f"ERROR: {validation_preprocessor_error}")
            print("Check HEC-RAS installation and model configuration.")
        except Exception as exc:
            validation_preprocessor_status = "failed"
            validation_preprocessor_error = str(exc)
            print(f"ERROR: Unexpected failure during geometry preprocessor run: {exc}")
        finally:
            if temporary_full_result_backup.exists():
                shutil.copy2(temporary_full_result_backup, plan_01_hdf)
                temporary_full_result_backup.unlink()
                print(f"Restored full-result plan HDF: {plan_01_hdf}")
else:
    print("Geometry preprocessor execution disabled via RAS_COMMANDER_RUN_GEOMETRY_PREPROCESSOR.")
    print(f"Expected review artifact path: {rerun_output_dir}")
    print(" ".join(f'"{part}"' if " " in part else part for part in display_cmd))

Write Notebook Review Artifacts

Python
summary_payload = {
    "notebook": "examples/957_ebfe_spring_river_validation.ipynb",
    "organized_folder": str(organized_folder),
    "audit_report": str(audit_path) if audit_path else None,
    "archived_preprocessor_report": str(archived_preprocessor_report) if archived_preprocessor_report else None,
    "archived_preprocessor_hdf": str(archived_preprocessor_hdf) if archived_preprocessor_hdf else None,
    "validation_preprocessor_executed": RUN_GEOMETRY_PREPROCESSOR,
    "validation_preprocessor_status": validation_preprocessor_status,
    "validation_preprocessor_return_code": validation_preprocessor_return_code,
    "validation_preprocessor_error": validation_preprocessor_error,
    "validation_preprocessor_report": str(validation_preprocessor_path) if validation_preprocessor_path else None,
    "validation_preprocessor_hdf": str(validation_preprocessor_hdf) if validation_preprocessor_hdf else None,
    "preprocessor_output_dir": str(rerun_output_dir),
    "result_hdf_count": int(len(organized_plan_hdfs)),
    "project_crs": project_record.get("project_crs"),
    "land_cover_reference": compatibility_df.iloc[0]["land_cover_reference"],
    "review_artifact_root": str(NOTEBOOK_ARTIFACT_ROOT),
}

summary_json = NOTEBOOK_ARTIFACT_ROOT / "spring_river_validation_summary.json"
summary_md = NOTEBOOK_ARTIFACT_ROOT / "spring_river_validation_summary.md"
summary_json.write_text(json.dumps(summary_payload, indent=2), encoding="utf-8")
summary_lines = [
    "# Spring River Notebook Review Summary",
    "",
    f"- Notebook: `{summary_payload['notebook']}`",
    f"- Organized folder: `{summary_payload['organized_folder']}`",
    f"- Audit report: `{summary_payload['audit_report']}`",
    f"- Archived preprocessor report: `{summary_payload['archived_preprocessor_report']}`",
    f"- Archived preprocessor HDF: `{summary_payload['archived_preprocessor_hdf']}`",
    f"- Validation preprocessor executed: `{summary_payload['validation_preprocessor_executed']}`",
    f"- Validation preprocessor status: `{summary_payload['validation_preprocessor_status']}`",
    f"- Validation preprocessor return code: `{summary_payload['validation_preprocessor_return_code']}`",
    f"- Validation preprocessor error: `{summary_payload['validation_preprocessor_error']}`",
    f"- Validation preprocessor report: `{summary_payload['validation_preprocessor_report']}`",
    f"- Validation preprocessor HDF: `{summary_payload['validation_preprocessor_hdf']}`",
    f"- Preprocessor output dir: `{summary_payload['preprocessor_output_dir']}`",
    f"- Result HDF count: `{summary_payload['result_hdf_count']}`",
    f"- Project CRS: `{summary_payload['project_crs']}`",
    f"- Geometry HDF land-cover reference: `{summary_payload['land_cover_reference']}`",
]
summary_md.write_text("\n".join(summary_lines), encoding="utf-8")
print(f"Wrote review summary: {summary_json}")
print(f"Wrote review markdown: {summary_md}")

Delivery-Format Interpretation

Spring River is a results-ready, notebook-validated eBFE example. The organized project remains clearly separated from Spring Creek through the spring-river slug and SpringRiver_11010010 folder naming, while still preserving the legacy Land Classification compatibility copy required by the delivered geometry HDF land-cover reference.

When enabled, the validation cell executes the HEC-RAS 6.1 geometry preprocessor for plan 01 / geometry 01, writes the fresh preprocessor report under NOTEBOOK_ARTIFACT_ROOT / "preprocessor_validation", restores the delivered full-result Spring_BLE.p01.hdf, confirms the organized project still resolves all seven hydraulic result HDFs, and writes an issue-scoped review summary under NOTEBOOK_ARTIFACT_ROOT.

CLB Engineering Corporation  ·  LLM Forward Engineering
RAS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.