Skip to content

SA/2D Connection Authoring

Create, inspect, delete, and re-create SA/2D area connections in a real HEC-RAS geometry file. This notebook demonstrates an engineering-oriented workflow for the Dam connection in BaldEagleDamBrk.g13:

  • inspect the connection line, crest profile, gates, terrain, mesh, and boundary context
  • sample the project terrain raster along the connection polyline
  • re-create the connection and write a terrain-derived weir crest profile
  • raise the minimum terrain-derived crest by 0.5 ft and floor lower points to that value
  • verify the result with before/after profile and map views

Primary APIs used:

  • GeomLateral.get_connections() - list SA/2D connections
  • GeomLateral.get_connection_line_coords() - read connection polyline coordinates
  • GeomLateral.get_connection_profile() - read weir crest station/elevation profile
  • GeomLateral.get_connection_gates() - read gate definitions
  • GeomLateral.delete_connection() - remove a connection block
  • GeomLateral.set_connection() - create or replace a connection block
  • GeomLateral.set_connection_profile() - write the sampled/modified weir crest profile
  • GeomLateral.set_connection_gates() - write gate definitions
  • HdfMesh and HdfBndry - read mesh cells, mesh perimeters, and BC lines from the geometry HDF

The terrain profile in this notebook is sampled directly from the extracted example project's Terrain/Terrain50.dtm_20ft.tif raster. RasExamples.extract_project() provides that terrain raster, the .rasmap, and the compiled geometry HDF used for the map context.

Development Mode

Python
#!pip install --upgrade ras-commander
Python
# =============================================================================
# DEVELOPMENT MODE TOGGLE
# =============================================================================
USE_LOCAL_SOURCE = False

if USE_LOCAL_SOURCE:
    import sys
    from pathlib import Path

    cwd = Path.cwd()
    local_path = cwd if (cwd / "ras_commander").exists() else cwd.parent
    if str(local_path) not in sys.path:
        sys.path.insert(0, str(local_path))
    print(f"LOCAL SOURCE MODE: loading from {local_path / 'ras_commander'}")
else:
    print("PIP PACKAGE MODE: loading installed ras-commander")

from pathlib import Path
import os
import logging
import warnings

import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.colors import Normalize
from matplotlib.lines import Line2D
from matplotlib.patches import Rectangle
import numpy as np
import pandas as pd
import rasterio
from rasterio.enums import Resampling
from rasterio.windows import bounds as window_bounds
from rasterio.windows import from_bounds
from shapely.geometry import LineString, box

from IPython.display import display

from ras_commander import RasExamples
from ras_commander.geom import GeomLateral
from ras_commander.hdf import HdfBndry, HdfMesh

warnings.filterwarnings("ignore", category=FutureWarning)
logging.getLogger("ras_commander").setLevel(logging.CRITICAL)

pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", 120)

plt.rcParams.update({
    "figure.dpi": 120,
    "axes.grid": True,
    "grid.alpha": 0.22,
    "axes.spines.top": False,
    "axes.spines.right": False,
})


def connection_line(coords_df):
    return LineString(coords_df[["X", "Y"]].to_numpy())


def expanded_bounds(bounds, fraction=0.25, min_pad=100.0):
    minx, miny, maxx, maxy = bounds
    width = max(maxx - minx, min_pad)
    height = max(maxy - miny, min_pad)
    pad_x = max(width * fraction, min_pad)
    pad_y = max(height * fraction, min_pad)
    return (minx - pad_x, miny - pad_y, maxx + pad_x, maxy + pad_y)


def sample_points_along_line(coords_df, spacing):
    line = connection_line(coords_df)
    stations = np.arange(0.0, line.length, spacing)
    if len(stations) == 0 or not np.isclose(stations[-1], line.length):
        stations = np.r_[stations, line.length]
    points = [line.interpolate(float(sta)) for sta in stations]
    return pd.DataFrame({
        "Station": np.round(stations, 3),
        "X": [pt.x for pt in points],
        "Y": [pt.y for pt in points],
    })


def sample_terrain_profile(coords_df, terrain_raster, spacing):
    samples = sample_points_along_line(coords_df, spacing)
    xy = list(zip(samples["X"], samples["Y"]))
    elevations = []
    with rasterio.open(terrain_raster) as src:
        for value in src.sample(xy, masked=True):
            elev = value[0]
            elevations.append(np.nan if np.ma.is_masked(elev) else float(elev))

    samples["Elevation"] = elevations
    if samples["Elevation"].isna().any():
        samples["Elevation"] = samples["Elevation"].interpolate(limit_direction="both")
    if samples["Elevation"].isna().any():
        raise ValueError("Terrain sampling produced unresolved NoData elevations.")
    return samples[["Station", "Elevation", "X", "Y"]]


def hillshade(elev_ma, azimuth=315, altitude=45):
    arr = np.ma.filled(elev_ma.astype(float), np.nan)
    if np.isnan(arr).all():
        return np.ma.masked_all(arr.shape)
    arr = np.where(np.isnan(arr), np.nanmean(arr), arr)
    dy, dx = np.gradient(arr)
    slope = np.pi / 2.0 - np.arctan(np.hypot(dx, dy))
    aspect = np.arctan2(-dx, dy)
    az = np.deg2rad(azimuth)
    alt = np.deg2rad(altitude)
    shaded = (
        np.sin(alt) * np.sin(slope)
        + np.cos(alt) * np.cos(slope) * np.cos(az - aspect)
    )
    return np.clip((shaded + 1.0) / 2.0, 0.0, 1.0)


def read_terrain_window(terrain_raster, bounds, max_pixels=900):
    left, bottom, right, top = bounds
    with rasterio.open(terrain_raster) as src:
        window = from_bounds(left, bottom, right, top, transform=src.transform)
        window = window.round_offsets().round_lengths()
        scale = max(window.width / max_pixels, window.height / max_pixels, 1.0)
        out_height = max(1, int(window.height / scale))
        out_width = max(1, int(window.width / scale))
        elev = src.read(
            1,
            window=window,
            out_shape=(out_height, out_width),
            masked=True,
            boundless=True,
            resampling=Resampling.bilinear,
        )
        left, bottom, right, top = window_bounds(window, src.transform)
        extent = (left, right, bottom, top)
    return elev, extent


def add_north_arrow(ax):
    ax.annotate(
        "N",
        xy=(0.94, 0.90),
        xytext=(0.94, 0.78),
        xycoords="axes fraction",
        ha="center",
        va="center",
        fontsize=10,
        arrowprops=dict(arrowstyle="-|>", color="black", linewidth=1.5),
        bbox=dict(boxstyle="round,pad=0.18", facecolor="white", edgecolor="0.7", alpha=0.8),
        zorder=20,
    )


def nice_scale_length(raw_length):
    if raw_length <= 0:
        return 100.0
    exponent = np.floor(np.log10(raw_length))
    base = raw_length / (10 ** exponent)
    step = 1 if base < 1.5 else 2 if base < 3.5 else 5 if base < 7.5 else 10
    return float(step * (10 ** exponent))


def add_scale_bar(ax):
    xmin, xmax = ax.get_xlim()
    ymin, ymax = ax.get_ylim()
    width = xmax - xmin
    height = ymax - ymin
    length = nice_scale_length(width * 0.18)
    x0 = xmin + width * 0.06
    y0 = ymin + height * 0.07
    ax.plot([x0, x0 + length], [y0, y0], color="black", linewidth=3, zorder=20)
    ax.plot([x0, x0], [y0 - height * 0.01, y0 + height * 0.01], color="black", linewidth=1.5, zorder=20)
    ax.plot([x0 + length, x0 + length], [y0 - height * 0.01, y0 + height * 0.01], color="black", linewidth=1.5, zorder=20)
    label = f"{length:,.0f} ft" if length < 5280 else f"{length / 5280:.1f} mi"
    ax.text(
        x0 + length / 2,
        y0 + height * 0.025,
        label,
        ha="center",
        va="bottom",
        fontsize=8,
        bbox=dict(facecolor="white", edgecolor="0.7", alpha=0.8, pad=2),
        zorder=20,
    )


def dedupe_legend(ax, loc="upper right", fontsize=8):
    handles, labels = ax.get_legend_handles_labels()
    seen = {}
    for handle, label in zip(handles, labels):
        if label and label not in seen:
            seen[label] = handle
    if seen:
        ax.legend(seen.values(), seen.keys(), loc=loc, fontsize=fontsize, frameon=True)


def label_lines(ax, gdf, column, bounds, fontsize=8):
    if gdf.empty:
        return
    view = box(*bounds)
    for _, row in gdf.iterrows():
        geom = row.geometry
        if geom is None or geom.is_empty or not geom.intersects(view):
            continue
        label_pt = geom.interpolate(0.5, normalized=True)
        ax.text(
            label_pt.x,
            label_pt.y,
            str(row[column]),
            fontsize=fontsize,
            ha="center",
            va="center",
            bbox=dict(facecolor="white", edgecolor="0.7", alpha=0.75, pad=1.5),
            zorder=15,
        )


def plot_context_map(
    ax,
    terrain_raster,
    mesh_areas,
    mesh_cells,
    bc_lines,
    bounds,
    title,
    show_cells=True,
):
    elev, extent = read_terrain_window(terrain_raster, bounds)
    valid = elev.compressed()
    if len(valid):
        vmin, vmax = np.percentile(valid, [2, 98])
    else:
        vmin, vmax = None, None
    ax.imshow(
        elev,
        extent=extent,
        origin="upper",
        cmap="gist_earth",
        vmin=vmin,
        vmax=vmax,
        alpha=0.82,
        zorder=0,
    )
    ax.imshow(hillshade(elev), extent=extent, origin="upper", cmap="gray", alpha=0.22, zorder=1)
    if len(valid):
        left_e, right_e, bottom_e, top_e = extent
        xs = np.linspace(left_e, right_e, elev.shape[1])
        ys = np.linspace(top_e, bottom_e, elev.shape[0])
        levels = np.linspace(vmin, vmax, 9)
        ax.contour(
            xs,
            ys,
            np.ma.filled(elev, np.nan),
            levels=levels,
            colors="white",
            linewidths=0.3,
            alpha=0.45,
            zorder=2,
        )

    left, bottom, right, top = bounds
    if show_cells and not mesh_cells.empty:
        cells_view = mesh_cells.cx[left:right, bottom:top]
        if not cells_view.empty:
            cells_view.boundary.plot(ax=ax, color="#4a5568", linewidth=0.28, alpha=0.50, zorder=3, label="2D mesh cells")

    if not mesh_areas.empty:
        mesh_areas.boundary.plot(ax=ax, color="#1f4e79", linewidth=1.2, alpha=0.95, zorder=4, label="2D flow area")
        for _, row in mesh_areas.iterrows():
            pt = row.geometry.representative_point()
            if left <= pt.x <= right and bottom <= pt.y <= top:
                ax.text(
                    pt.x,
                    pt.y,
                    row["mesh_name"],
                    fontsize=9,
                    weight="bold",
                    ha="center",
                    va="center",
                    bbox=dict(facecolor="white", edgecolor="#1f4e79", alpha=0.7, pad=2),
                    zorder=14,
                )

    if not bc_lines.empty:
        bc_view = bc_lines.cx[left:right, bottom:top]
        if not bc_view.empty:
            bc_view.plot(ax=ax, color="#2b6cb0", linewidth=2.0, linestyle="--", zorder=6, label="BC line")
            label_lines(ax, bc_view, "Name", bounds)

    ax.set_xlim(left, right)
    ax.set_ylim(bottom, top)
    ax.set_aspect("equal", adjustable="box")
    ax.set_title(title)
    ax.set_xlabel("Easting (ft)")
    ax.set_ylabel("Northing (ft)")
    add_north_arrow(ax)
    add_scale_bar(ax)


def add_gate_rectangles(ax, gates, facecolor="#e17c05", edgecolor="#9a3412", label="Gate opening"):
    label_used = False
    if gates.empty:
        return
    for _, gate in gates.iterrows():
        stations = gate.get("OpeningStations", []) or []
        width = float(gate.get("Width", 0.0) or 0.0)
        height = float(gate.get("Height", 0.0) or 0.0)
        invert = float(gate.get("InvertElevation", np.nan))
        if width <= 0 or height <= 0 or np.isnan(invert):
            continue
        for sta in stations:
            rect = Rectangle(
                (float(sta) - width / 2.0, invert),
                width,
                height,
                facecolor=facecolor,
                edgecolor=edgecolor,
                linewidth=1.1,
                alpha=0.28,
                label=label if not label_used else None,
                zorder=8,
            )
            ax.add_patch(rect)
            ax.text(
                float(sta),
                invert + height,
                gate["GateName"],
                fontsize=7,
                ha="center",
                va="bottom",
                color=edgecolor,
                rotation=90,
                zorder=9,
            )
            label_used = True


def set_profile_ylim(ax, profiles, gates=None):
    if gates is None:
        gates = pd.DataFrame()
    values = []
    for profile in profiles:
        values.extend(profile["Elevation"].dropna().tolist())
    if not gates.empty:
        for _, gate in gates.iterrows():
            invert = float(gate.get("InvertElevation", np.nan))
            height = float(gate.get("Height", 0.0) or 0.0)
            if not np.isnan(invert):
                values.extend([invert, invert + height])
    if values:
        ymin, ymax = min(values), max(values)
        pad = max((ymax - ymin) * 0.08, 2.0)
        ax.set_ylim(ymin - pad, ymax + pad)


def add_gate_station_markers(ax, gates):
    if gates.empty:
        return
    top = ax.get_ylim()[1]
    for _, gate in gates.iterrows():
        stations = gate.get("OpeningStations", []) or []
        for sta in stations:
            ax.axvline(float(sta), color="#9a3412", linewidth=0.9, alpha=0.45, zorder=3)
            ax.text(
                float(sta),
                top,
                gate["GateName"],
                fontsize=7,
                ha="center",
                va="top",
                color="#9a3412",
                rotation=90,
                zorder=9,
            )


def plot_gate_band(ax, gates):
    if gates.empty:
        ax.text(0.5, 0.5, "No gates", transform=ax.transAxes, ha="center", va="center")
        return
    add_gate_rectangles(ax, gates)
    values = []
    for _, gate in gates.iterrows():
        invert = float(gate.get("InvertElevation", np.nan))
        height = float(gate.get("Height", 0.0) or 0.0)
        if not np.isnan(invert):
            values.extend([invert, invert + height])
    ymin, ymax = min(values), max(values)
    pad = max((ymax - ymin) * 0.25, 2.0)
    ax.set_ylim(ymin - pad, ymax + pad)
    ax.set_ylabel("Gate elev. (ft)")
    ax.set_xlabel("Station along connection (ft)")
    ax.set_title("Gate Opening Elevations")
    dedupe_legend(ax, loc="upper right", fontsize=7)


def plot_profile(ax, terrain_profile, crest_profile, gates, title, crest_label, show_gate_rectangles=True):
    ax.plot(
        terrain_profile["Station"],
        terrain_profile["Elevation"],
        color="0.55",
        linewidth=1.4,
        linestyle="--",
        label="Terrain profile",
        zorder=2,
    )
    ax.plot(
        crest_profile["Station"],
        crest_profile["Elevation"],
        color="#1f4e79",
        linewidth=2.1,
        label=crest_label,
        zorder=4,
    )
    if show_gate_rectangles:
        add_gate_rectangles(ax, gates)
    ax.set_title(title)
    ax.set_xlabel("Station along connection (ft)")
    ax.set_ylabel("Elevation (ft)")
    set_profile_ylim(ax, [terrain_profile, crest_profile], gates if show_gate_rectangles else None)
    dedupe_legend(ax, loc="lower right")


def save_figure(fig, filename):
    FIGURE_DIR.mkdir(parents=True, exist_ok=True)
    output_path = FIGURE_DIR / filename
    fig.savefig(output_path, dpi=180)
    print(f"Saved figure: {output_path}")


import ras_commander

print(f"Loaded: {ras_commander.__file__}")

Parameters

Python
PROJECT_NAME = "BaldEagleCrkMulti2D"
PROJECT_SUFFIX = "214_connection_authoring"
GEOM_FILE_NAME = "BaldEagleDamBrk.g13"
TARGET_CONNECTION = "Dam"

TERRAIN_RASTER_NAME = "Terrain50.dtm_20ft.tif"
TERRAIN_SAMPLE_SPACING_FT = 25.0
CREST_IMPROVEMENT_FT = 0.5

cwd = Path.cwd()
REPO_ROOT = cwd if (cwd / "ras_commander").exists() else cwd.parent
WORK_ROOT = REPO_ROOT / "working" / "connection_authoring"
WORK_ROOT.mkdir(parents=True, exist_ok=True)
FIGURE_DIR = Path(
    os.environ.get(
        "RAS_COMMANDER_NOTEBOOK_FIGURE_DIR",
        str(WORK_ROOT / "figures"),
    )
)
FIGURE_DIR.mkdir(parents=True, exist_ok=True)

print(f"Working folder: {WORK_ROOT}")
print(f"Figure folder: {FIGURE_DIR}")

Extract And Inspect The Project

Python
project_path = RasExamples.extract_project(
    PROJECT_NAME,
    output_path=WORK_ROOT,
    suffix=PROJECT_SUFFIX,
)
geom_file = project_path / GEOM_FILE_NAME
geom_hdf = project_path / f"{GEOM_FILE_NAME}.hdf"
rasmap_file = project_path / "BaldEagleDamBrk.rasmap"
terrain_raster = project_path / "Terrain" / TERRAIN_RASTER_NAME

print(f"Project path: {project_path}")
print(f"Geometry file: {geom_file.name}")
print(f"Geometry HDF: {geom_hdf.name}")
print(f"RASMapper file: {rasmap_file.name}")
print(f"Terrain raster: {terrain_raster.relative_to(project_path)}")

assert geom_file.exists()
assert geom_hdf.exists()
assert rasmap_file.exists()
assert terrain_raster.exists()

connections = GeomLateral.get_connections(geom_file)
display_cols = ["Name", "Type", "From", "To", "NumPoints", "LinePoints", "HasGate"]
display(connections[display_cols])

mesh_areas = HdfMesh.get_mesh_areas(geom_hdf)
mesh_cells = HdfMesh.get_mesh_cell_polygons(geom_hdf)
bc_lines = HdfBndry.get_bc_lines(geom_hdf)

connection_lines = {}
for name in connections["Name"]:
    try:
        connection_lines[name] = GeomLateral.get_connection_line_coords(geom_file, name)
    except ValueError:
        pass

context_summary = pd.DataFrame([{
    "Connections": len(connections),
    "2D Flow Areas": len(mesh_areas),
    "2D Mesh Cells": len(mesh_cells),
    "BC Lines": len(bc_lines),
    "Terrain Source": str(terrain_raster.relative_to(project_path)),
    "Mesh Source": geom_hdf.name,
}])
display(context_summary)

Model Context And Visualization Targets

Before modifying the connection, an engineer needs enough spatial and hydraulic context to interpret the profile:

  • target connection alignment and adjacent storage/2D flow areas
  • 2D mesh perimeter and local cells crossed by the connection
  • downstream boundary condition locations
  • underlying terrain along and around the connection
  • weir crest station/elevation profile and gate openings

The overview below locates the target connection in the model. The detailed inspection maps that follow zoom to the connection line with a 25 percent margin around its extents.

Python
target_coords = connection_lines[TARGET_CONNECTION]
target_line = connection_line(target_coords)
overview_bounds = expanded_bounds(mesh_areas.total_bounds, fraction=0.04, min_pad=1000.0)

fig, ax = plt.subplots(figsize=(11, 7))
plot_context_map(
    ax,
    terrain_raster,
    mesh_areas,
    mesh_cells,
    bc_lines,
    overview_bounds,
    "Model Overview: Mesh, BC Lines, And SA/2D Connections",
    show_cells=False,
)

for name, coords in connection_lines.items():
    color = "#b91c1c" if name == TARGET_CONNECTION else "#7c2d12"
    linewidth = 2.8 if name == TARGET_CONNECTION else 1.4
    label = f"Target connection: {name}" if name == TARGET_CONNECTION else "Other SA/2D connections"
    ax.plot(coords["X"], coords["Y"], color=color, linewidth=linewidth, alpha=0.95, zorder=8, label=label)
    mid = connection_line(coords).interpolate(0.5, normalized=True)
    ax.text(
        mid.x,
        mid.y,
        name,
        fontsize=8,
        ha="center",
        va="center",
        bbox=dict(facecolor="white", edgecolor=color, alpha=0.75, pad=1.5),
        zorder=16,
    )

dedupe_legend(ax, loc="lower left")
fig.tight_layout()
save_figure(fig, "214_connection_authoring_model_overview.png")
plt.show()

Inspect The Target Connection

Python
original_coords = GeomLateral.get_connection_line_coords(geom_file, TARGET_CONNECTION)
original_profile = GeomLateral.get_connection_profile(geom_file, TARGET_CONNECTION)
original_gates = GeomLateral.get_connection_gates(geom_file, TARGET_CONNECTION)
terrain_profile = sample_terrain_profile(
    original_coords,
    terrain_raster,
    TERRAIN_SAMPLE_SPACING_FT,
)

print(f"Connection line points: {len(original_coords)}")
print(f"Original weir crest points: {len(original_profile)}")
print(f"Terrain profile samples: {len(terrain_profile)} at {TERRAIN_SAMPLE_SPACING_FT:g} ft spacing")
print(f"Gates: {len(original_gates)}")

gate_cols = ["GateName", "Width", "Height", "InvertElevation", "GateCoefficient", "NumOpenings", "OpeningStations"]
profile_stats = pd.DataFrame([{
    "Source": "Existing weir crest",
    "Points": len(original_profile),
    "Min Elev": original_profile["Elevation"].min(),
    "Max Elev": original_profile["Elevation"].max(),
    "Min Station": original_profile.loc[original_profile["Elevation"].idxmin(), "Station"],
}, {
    "Source": "Terrain raster sample",
    "Points": len(terrain_profile),
    "Min Elev": terrain_profile["Elevation"].min(),
    "Max Elev": terrain_profile["Elevation"].max(),
    "Min Station": terrain_profile.loc[terrain_profile["Elevation"].idxmin(), "Station"],
}])
display(profile_stats.round(3))
display(original_gates[gate_cols])

Weir Crest Profile With Terrain And Gates

Python
fig, (ax, gate_ax) = plt.subplots(
    2,
    1,
    figsize=(12, 6.2),
    sharex=True,
    gridspec_kw={"height_ratios": [4.0, 1.15]},
)
plot_profile(
    ax,
    terrain_profile,
    original_profile,
    original_gates,
    f"{TARGET_CONNECTION}: Existing Weir Crest Against Terrain",
    "Existing weir crest",
    show_gate_rectangles=False,
)
add_gate_station_markers(ax, original_gates)
plot_gate_band(gate_ax, original_gates)
fig.tight_layout()
save_figure(fig, "214_connection_authoring_existing_profile.png")
plt.show()

Connection Map View

Python
connection_bounds = expanded_bounds(target_line.bounds, fraction=0.25, min_pad=250.0)

fig, ax = plt.subplots(figsize=(11, 7))
plot_context_map(
    ax,
    terrain_raster,
    mesh_areas,
    mesh_cells,
    bc_lines,
    connection_bounds,
    f"{TARGET_CONNECTION}: Terrain, Mesh Cells, And Connection Alignment",
    show_cells=True,
)
ax.plot(original_coords["X"], original_coords["Y"], color="#b91c1c", linewidth=2.8, zorder=10, label="Connection line")
ax.scatter(original_coords["X"], original_coords["Y"], s=16, color="#fee2e2", edgecolor="#b91c1c", linewidth=0.6, zorder=11)

dedupe_legend(ax, loc="upper left")
fig.tight_layout()
save_figure(fig, "214_connection_authoring_connection_map.png")
plt.show()

Delete The Connection

Python
GeomLateral.delete_connection(geom_file, TARGET_CONNECTION, create_backup=True)

after_delete = GeomLateral.get_connections(geom_file)
display(after_delete[["Name", "Type", "From", "To"]])

assert TARGET_CONNECTION not in after_delete["Name"].values
assert len(after_delete) == len(connections) - 1
print(f"Deleted '{TARGET_CONNECTION}'. {len(after_delete)} connections remain.")

Re-Create The Connection With Modified Parameters

Python
coords_tuples = list(zip(
    original_coords["X"].tolist(),
    original_coords["Y"].tolist(),
))

dam_row = connections.loc[connections["Name"] == TARGET_CONNECTION].iloc[0]

GeomLateral.set_connection(
    geom_file,
    TARGET_CONNECTION,
    coords_tuples,
    upstream_area=dam_row["From"],
    downstream_area=dam_row["To"],
    routing_type=1,
    weir_width=120.0,
    weir_coef=2.6,
    overflow_method_2d=True,
    create_backup=False,
)

after_recreate = GeomLateral.get_connections(geom_file)
display(after_recreate[["Name", "Type", "From", "To", "NumPoints", "LinePoints"]])

assert TARGET_CONNECTION in after_recreate["Name"].values
assert len(after_recreate) == len(connections)
print(f"Re-created '{TARGET_CONNECTION}' with modified weir coefficient and width.")

Derive And Modify The Weir Crest From Terrain

Python
terrain_derived_profile = terrain_profile[["Station", "Elevation", "X", "Y"]].copy()
terrain_min_idx = terrain_derived_profile["Elevation"].idxmin()
terrain_min_elev = float(terrain_derived_profile.loc[terrain_min_idx, "Elevation"])
new_min_elev = terrain_min_elev + CREST_IMPROVEMENT_FT

modified_profile = terrain_derived_profile.copy()
modified_profile["TerrainElevation"] = terrain_derived_profile["Elevation"]
modified_profile["Elevation"] = np.maximum(modified_profile["TerrainElevation"], new_min_elev)
modified_profile["Delta"] = modified_profile["Elevation"] - modified_profile["TerrainElevation"]

GeomLateral.set_connection_profile(
    geom_file,
    TARGET_CONNECTION,
    modified_profile[["Station", "Elevation"]],
    create_backup=False,
)

updated_profile = GeomLateral.get_connection_profile(geom_file, TARGET_CONNECTION)
raised_points = modified_profile[modified_profile["Delta"] > 0].copy()

display(pd.DataFrame([{
    "Terrain Min Station": modified_profile.loc[terrain_min_idx, "Station"],
    "Terrain Min Elev": terrain_min_elev,
    "New Minimum Crest Elev": new_min_elev,
    "Improvement": CREST_IMPROVEMENT_FT,
    "Points Raised": len(raised_points),
    "Profile Points Written": len(updated_profile),
}]).round(3))
display(raised_points[["Station", "TerrainElevation", "Elevation", "Delta"]].round(3).head(12))

assert len(updated_profile) == len(modified_profile)
assert np.isclose(updated_profile["Elevation"].min(), new_min_elev, atol=0.001)
assert (modified_profile["Elevation"] >= new_min_elev - 1e-9).all()
print(
    f"Terrain-derived profile written. Minimum crest raised from "
    f"{terrain_min_elev:.2f} ft to {new_min_elev:.2f} ft."
)

Add Modified Gates

Python
new_gates = [
    {
        "GateName": "Main Gate",
        "Width": 10.0,
        "Height": 15.0,
        "InvertElevation": 590.0,
        "GateCoefficient": 0.65,
        "NumOpenings": 2,
        "OpeningStations": [5745.0, 5765.0],
    },
    {
        "GateName": "Auxiliary Gate",
        "Width": 5.0,
        "Height": 8.0,
        "InvertElevation": 595.0,
        "GateCoefficient": 0.60,
        "NumOpenings": 1,
        "OpeningStations": [3500.0],
    },
]

GeomLateral.set_connection_gates(
    geom_file,
    TARGET_CONNECTION,
    new_gates,
    create_backup=False,
)

restored_gates = GeomLateral.get_connection_gates(geom_file, TARGET_CONNECTION)
display(restored_gates[gate_cols])

assert len(restored_gates) == 2
for i, expected in enumerate(new_gates):
    actual = restored_gates.iloc[i]
    assert actual["GateName"] == expected["GateName"], f"Gate {i} name mismatch"
    assert actual["Width"] == expected["Width"], f"Gate {i} width mismatch"
    assert actual["Height"] == expected["Height"], f"Gate {i} height mismatch"
    assert np.isclose(actual["InvertElevation"], expected["InvertElevation"], atol=0.01), (
        f"Gate {i} invert mismatch: expected {expected['InvertElevation']}, got {actual['InvertElevation']}"
    )
    assert np.isclose(actual["GateCoefficient"], expected["GateCoefficient"], atol=0.01), (
        f"Gate {i} coefficient mismatch: expected {expected['GateCoefficient']}, got {actual['GateCoefficient']}"
    )
    assert actual["NumOpenings"] == expected["NumOpenings"], f"Gate {i} num openings mismatch"
    actual_stations = [float(s) for s in actual["OpeningStations"]]
    expected_stations = [float(s) for s in expected["OpeningStations"]]
    assert actual_stations == expected_stations, (
        f"Gate {i} stations mismatch: expected {expected_stations}, got {actual_stations}"
    )
print(f"Wrote {len(restored_gates)} gates to '{TARGET_CONNECTION}' — all fields verified.")

Before/After Profile Comparison

Python
fig, axes = plt.subplots(
    3,
    1,
    figsize=(12, 9.4),
    sharex=False,
    gridspec_kw={"height_ratios": [3.7, 2.5, 1.15]},
)
ax = axes[0]
ax.plot(
    terrain_derived_profile["Station"],
    terrain_derived_profile["Elevation"],
    color="0.30",
    linewidth=1.8,
    linestyle="--",
    label="Terrain-derived crest before floor",
    alpha=0.85,
    zorder=5,
)
ax.plot(
    modified_profile["Station"],
    modified_profile["Elevation"],
    color="#b91c1c",
    linewidth=2.2,
    label="Modified crest after +0.5 ft floor",
    zorder=4,
)
ax.axhline(new_min_elev, color="#b91c1c", linestyle=":", linewidth=1.5, label=f"New minimum {new_min_elev:.2f} ft")
ax.scatter(
    modified_profile.loc[terrain_min_idx, "Station"],
    new_min_elev,
    marker="v",
    s=70,
    color="#b91c1c",
    edgecolor="white",
    linewidth=0.8,
    zorder=7,
    label="Raised terrain minimum",
)
ax.set_title(f"{TARGET_CONNECTION}: Terrain-Derived Crest Before And After Improvement")
ax.set_xlabel("")
ax.set_ylabel("Elevation (ft)")
set_profile_ylim(ax, [terrain_derived_profile, modified_profile])
add_gate_station_markers(ax, restored_gates)
dedupe_legend(ax, loc="lower right")

zoom_pad = 50.0
zoom_min = max(float(raised_points["Station"].min()) - zoom_pad, 0.0)
zoom_max = min(float(raised_points["Station"].max()) + zoom_pad, float(modified_profile["Station"].max()))
zoom = modified_profile[(modified_profile["Station"] >= zoom_min) & (modified_profile["Station"] <= zoom_max)]
terrain_zoom = terrain_derived_profile[
    (terrain_derived_profile["Station"] >= zoom_min)
    & (terrain_derived_profile["Station"] <= zoom_max)
]

ax_zoom = axes[1]
ax_zoom.plot(
    terrain_zoom["Station"],
    terrain_zoom["Elevation"],
    color="0.25",
    linewidth=1.8,
    linestyle="--",
    label="Before",
    zorder=5,
)
ax_zoom.plot(
    zoom["Station"],
    zoom["Elevation"],
    color="#b91c1c",
    linewidth=2.2,
    label="After",
    zorder=4,
)
ax_zoom.fill_between(
    zoom["Station"],
    zoom["TerrainElevation"],
    zoom["Elevation"],
    where=zoom["Delta"] > 0,
    color="#fca5a5",
    alpha=0.6,
    label="Filled low spots",
    zorder=3,
)
ax_zoom.axhline(new_min_elev, color="#b91c1c", linestyle=":", linewidth=1.4)
ax_zoom.annotate(
    f"+{CREST_IMPROVEMENT_FT:.1f} ft minimum",
    xy=(float(modified_profile.loc[terrain_min_idx, "Station"]), new_min_elev),
    xytext=(float(modified_profile.loc[terrain_min_idx, "Station"]) + 120.0, new_min_elev + 1.1),
    arrowprops=dict(arrowstyle="->", color="#b91c1c", linewidth=1.0),
    color="#b91c1c",
    fontsize=8,
    bbox=dict(facecolor="white", edgecolor="#fca5a5", alpha=0.85, pad=2),
)
ax_zoom.set_xlim(zoom_min, zoom_max)
ax_zoom.set_title("Focused View Of +0.5 ft Minimum Crest Improvement", pad=10)
ax_zoom.set_xlabel("")
ax_zoom.set_ylabel("Elevation (ft)")
set_profile_ylim(ax_zoom, [terrain_zoom, zoom])
dedupe_legend(ax_zoom, loc="lower right", fontsize=8)

plot_gate_band(axes[2], restored_gates)
axes[2].set_xlim(
    float(terrain_derived_profile["Station"].min()),
    float(terrain_derived_profile["Station"].max()),
)
fig.subplots_adjust(hspace=0.55)
save_figure(fig, "214_connection_authoring_before_after_profile.png")
plt.show()

Map View Of Modified Connection

Python
fig, ax = plt.subplots(figsize=(11, 7))
plot_context_map(
    ax,
    terrain_raster,
    mesh_areas,
    mesh_cells,
    bc_lines,
    connection_bounds,
    f"{TARGET_CONNECTION}: Plan View With Crest Raise Highlighted",
    show_cells=True,
)

points = modified_profile[["X", "Y"]].to_numpy()
segments = np.stack([points[:-1], points[1:]], axis=1)
segment_delta = np.maximum(
    modified_profile["Delta"].to_numpy()[:-1],
    modified_profile["Delta"].to_numpy()[1:],
)
norm = Normalize(vmin=0.0, vmax=max(CREST_IMPROVEMENT_FT, float(segment_delta.max())))
line_collection = LineCollection(
    segments,
    cmap="inferno",
    norm=norm,
    linewidth=3.4,
    zorder=12,
)
line_collection.set_array(segment_delta)
ax.add_collection(line_collection)

low_pt = modified_profile.loc[terrain_min_idx]
ax.scatter(
    [low_pt["X"]],
    [low_pt["Y"]],
    marker="v",
    s=90,
    color="#b91c1c",
    edgecolor="white",
    linewidth=0.9,
    zorder=14,
    label="Raised low point",
)
ax.text(
    low_pt["X"],
    low_pt["Y"],
    f"STA {low_pt['Station']:.0f}\n+{CREST_IMPROVEMENT_FT:.1f} ft min",
    fontsize=8,
    ha="left",
    va="bottom",
    bbox=dict(facecolor="white", edgecolor="#b91c1c", alpha=0.85, pad=2),
    zorder=15,
)
cbar = fig.colorbar(line_collection, ax=ax, fraction=0.035, pad=0.02)
cbar.set_label("Crest raise above terrain-derived profile (ft)")
dedupe_legend(ax, loc="upper left")
fig.tight_layout()
save_figure(fig, "214_connection_authoring_modified_connection_map.png")
plt.show()

Summary

Python
new_coords = GeomLateral.get_connection_line_coords(geom_file, TARGET_CONNECTION)
final_conns = GeomLateral.get_connections(geom_file)

summary = pd.DataFrame([{
    "Connection": TARGET_CONNECTION,
    "Original Weir Coef": 3.82,
    "Modified Weir Coef": 2.60,
    "Original Weir Width": 100,
    "Modified Weir Width": 120,
    "Terrain Source": str(terrain_raster.relative_to(project_path)),
    "Terrain Samples": len(terrain_derived_profile),
    "Minimum Crest Raise": f"+{CREST_IMPROVEMENT_FT:.1f} ft",
    "Points Floored": int((modified_profile["Delta"] > 0).sum()),
    "Original Gates": len(original_gates),
    "Modified Gates": len(restored_gates),
    "Line Points Preserved": len(new_coords) == len(original_coords),
    "Connections Preserved": len(final_conns) == len(connections),
}])
display(summary)

assert len(final_conns) == len(connections)
assert len(new_coords) == len(original_coords)
assert len(restored_gates) == 2
print("Connection authoring workflow complete.")

Validate Write Format With HEC-RAS Compute

The file-level assertions above confirm Python can read back what it wrote, but the ultimate validation is running HEC-RAS on the modified geometry and checking that preprocessing and simulation complete without errors. This catches format issues that round-trip checks cannot: malformed fixed-width encoding, invalid coordinate precision, or structural corruption in the connection block.

The demonstration cells above delete and re-create the Dam connection, which removes breach parameters required by Plan 04 (a dam break simulation). To isolate the profile/gate write validation, we extract a fresh project copy and apply set_connection_profile and set_connection_gates with the modified data — then run HEC-RAS directly on that modified geometry without restoring. This proves the API's write format produces geometry HEC-RAS can preprocess and simulate.

Python
import shutil
import re
from ras_commander import init_ras_project, RasCmdr
from ras_commander.hdf import HdfResultsPlan

logging.getLogger("ras_commander").setLevel(logging.INFO)

# Extract a FRESH project copy for compute validation (independent of demo above)
validate_path = WORK_ROOT / "compute_validation"
if validate_path.exists():
    shutil.rmtree(validate_path)
validate_path.mkdir(parents=True, exist_ok=True)

fresh_project = RasExamples.extract_project(PROJECT_NAME, validate_path, suffix="validate")
fresh_geom = fresh_project / GEOM_FILE_NAME

# --------------------------------------------------------------------------
# Write MODIFIED profile and gates to the fresh geometry (no restore).
# HEC-RAS will run on this modified file to prove the write format is valid.
# --------------------------------------------------------------------------

# Create a validation profile: terrain-derived but floored at 651 ft to stay
# above the mesh cell base elevations (~649.6 ft). This profile differs from
# the original (298 points vs 6, terrain-sampled shape vs flat) while remaining
# physically valid for simulation.
VALIDATION_FLOOR_ELEV = 651.0
validation_profile = modified_profile[["Station", "Elevation"]].copy()
validation_profile["Elevation"] = np.maximum(validation_profile["Elevation"], VALIDATION_FLOOR_ELEV)

# Write the validation profile
GeomLateral.set_connection_profile(
    fresh_geom, TARGET_CONNECTION, validation_profile, create_backup=False
)

# Verify profile round-trip with full assertions
readback_profile = GeomLateral.get_connection_profile(fresh_geom, TARGET_CONNECTION)
assert len(readback_profile) == len(validation_profile), (
    f"Profile length mismatch: wrote {len(validation_profile)}, read {len(readback_profile)}"
)
assert np.isclose(readback_profile["Elevation"].min(), VALIDATION_FLOOR_ELEV, atol=0.001), (
    f"Profile min elevation mismatch: expected {VALIDATION_FLOOR_ELEV}, got {readback_profile['Elevation'].min()}"
)
assert np.allclose(
    readback_profile["Elevation"].values,
    validation_profile["Elevation"].values,
    atol=0.01,
), "Profile elevation values diverged beyond 0.01 ft tolerance"
print(f"Profile write verified: {len(readback_profile)} points, "
      f"min elev {readback_profile['Elevation'].min():.3f} ft, "
      f"max elev {readback_profile['Elevation'].max():.3f} ft")

# Write a modified gate that keeps the original name (Plan 04's unsteady file
# has time series for "Gate #1") but uses completely different dimensions,
# position, and coefficient — proving the gate write format is valid.
validation_gates = [
    {
        "GateName": "Gate #1",
        "Width": 12.0,
        "Height": 18.0,
        "InvertElevation": 585.0,
        "GateCoefficient": 0.70,
        "NumOpenings": 2,
        "OpeningStations": [5700.0, 5800.0],
    },
]
GeomLateral.set_connection_gates(fresh_geom, TARGET_CONNECTION, validation_gates, create_backup=False)

# Verify gate round-trip with FULL field assertions
readback_gates = GeomLateral.get_connection_gates(fresh_geom, TARGET_CONNECTION)
assert len(readback_gates) == len(validation_gates), (
    f"Gate count mismatch: wrote {len(validation_gates)}, read {len(readback_gates)}"
)
for i, expected in enumerate(validation_gates):
    actual = readback_gates.iloc[i]
    assert actual["GateName"] == expected["GateName"], f"Gate {i} name mismatch"
    assert actual["Width"] == expected["Width"], f"Gate {i} width mismatch"
    assert actual["Height"] == expected["Height"], f"Gate {i} height mismatch"
    assert np.isclose(actual["InvertElevation"], expected["InvertElevation"], atol=0.01), (
        f"Gate {i} invert mismatch: expected {expected['InvertElevation']}, got {actual['InvertElevation']}"
    )
    assert np.isclose(actual["GateCoefficient"], expected["GateCoefficient"], atol=0.01), (
        f"Gate {i} coefficient mismatch: expected {expected['GateCoefficient']}, got {actual['GateCoefficient']}"
    )
    assert actual["NumOpenings"] == expected["NumOpenings"], f"Gate {i} num openings mismatch"
    actual_stations = [float(s) for s in actual["OpeningStations"]]
    expected_stations = [float(s) for s in expected["OpeningStations"]]
    assert actual_stations == expected_stations, (
        f"Gate {i} stations mismatch: expected {expected_stations}, got {actual_stations}"
    )
print(f"Gate write verified: {len(readback_gates)} gate(s), all fields match")
print(f"  Modified: Width={validation_gates[0]['Width']}, Height={validation_gates[0]['Height']}, "
      f"Invert={validation_gates[0]['InvertElevation']}, Coef={validation_gates[0]['GateCoefficient']}, "
      f"Openings={validation_gates[0]['NumOpenings']}, Stations={validation_gates[0]['OpeningStations']}")

# --------------------------------------------------------------------------
# Run HEC-RAS on the MODIFIED geometry (not the original).
# This is the definitive format validation ��� HEC-RAS parses the written file.
# --------------------------------------------------------------------------

# Initialize and find plan using g13
ras_val = init_ras_project(fresh_project, "7.0")
plans_using_g13 = ras_val.plan_df[ras_val.plan_df["Geom File"] == "13"]
assert not plans_using_g13.empty, "No plan found using geometry file .g13"
plan_number = plans_using_g13.iloc[0]["plan_number"]

# Update Program Version to match installed HEC-RAS (avoids GUI popup)
plan_file = fresh_project / f"BaldEagleDamBrk.p{plan_number.zfill(2)}"
plan_text = plan_file.read_text()
plan_text = re.sub(r"Program Version=.*", f"Program Version={ras_val.ras_version}", plan_text)
plan_file.write_text(plan_text)

geom_text = fresh_geom.read_text()
geom_text = re.sub(r"Program Version=.*", f"Program Version={ras_val.ras_version}", geom_text)
fresh_geom.write_text(geom_text)
print(f"\nUpdated plan {plan_number} and geometry Program Version to {ras_val.ras_version}")

print(f"Running Plan {plan_number} on MODIFIED geometry (298-pt profile floored at "
      f"{VALIDATION_FLOOR_ELEV} ft + modified gate)...")
RasCmdr.compute_plan(plan_number, ras_object=ras_val, force_geompre=True, num_cores=4)

hdf_path = ras_val.plan_df.loc[
    ras_val.plan_df["plan_number"] == plan_number, "HDF_Results_Path"
].iloc[0]
assert hdf_path and Path(hdf_path).exists(), f"HDF file not created for plan {plan_number}"

messages = HdfResultsPlan.get_compute_messages(Path(hdf_path))
lines = messages.splitlines() if messages else []

# Filter for real errors (exclude iteration convergence table headers and metrics)
errors = [l for l in lines if "ERROR" in l.upper()
          and "VOLUME ACCOUNTING" not in l.upper()
          and "ITERATIONS" not in l.upper()
          and "WSEL ERROR" not in l.upper()]
warnings_list = [l for l in lines if "WARNING" in l.upper()]
unstable = [l for l in lines if "UNSTABLE" in l.upper() or "NEGATIVE DEPTH" in l.upper()]

if errors:
    print(f"\nERRORS ({len(errors)}):")
    for e in errors[:10]:
        print(f"  {e}")
    raise RuntimeError(f"HEC-RAS reported {len(errors)} error(s) with modified geometry")

if unstable:
    print(f"\nInstability warnings ({len(unstable)}) — physics-related, not format errors:")
    for w in unstable[:5]:
        print(f"  {w}")

if warnings_list:
    print(f"\nWarnings ({len(warnings_list)}): first 5 shown below")
    for w in warnings_list[:5]:
        print(f"  {w}")

print(f"\n{'='*70}")
print(f"VALIDATION RESULT")
print(f"{'='*70}")
print(f"  Geometry file:    {fresh_geom.name} (MODIFIED — not original)")
print(f"  Profile:          {len(readback_profile)} points (terrain-derived, floored at {VALIDATION_FLOOR_ELEV} ft)")
print(f"  Gate:             Modified width/height/invert/coef/openings/stations")
print(f"  HEC-RAS version:  {ras_val.ras_version}")
print(f"  Plan {plan_number}:          Preprocessing + simulation COMPLETED")
print(f"  Format errors:    {len(errors)}")
print(f"  HDF results:      {Path(hdf_path).name}")
print(f"{'='*70}")
print(f"  set_connection_profile write format: VALIDATED (HEC-RAS accepted)")
print(f"  set_connection_gates write format:   VALIDATED (HEC-RAS accepted)")
print(f"{'='*70}")

Verify Structure In HDF Results

The compute validation above proves HEC-RAS accepted the modified geometry. As final proof that the Dam connection actively participated in the simulation, extract its time series data from the HDF results using the structure-level HDF extraction APIs.

Python
from ras_commander.hdf import HdfStruc, HdfResultsBreach

hdf_file = Path(hdf_path)

# List all SA/2D connections with time series in the results
connections_in_results = HdfStruc.list_sa2d_connections(hdf_file)
print(f"SA/2D connections in HDF results: {connections_in_results}")
assert TARGET_CONNECTION in connections_in_results, (
    f"'{TARGET_CONNECTION}' not found in HDF results — expected among {connections_in_results}"
)

# Extract structure-level time series for the Dam connection
structure_ts = HdfResultsBreach.get_structure_variables(hdf_file, TARGET_CONNECTION)
print(f"\nTime series shape: {structure_ts.shape}")
print(f"Columns: {list(structure_ts.columns)}")
print(f"Time range: {structure_ts['datetime'].iloc[0]}{structure_ts['datetime'].iloc[-1]}")

# Summary statistics proving the connection carried flow
peak_total_flow = structure_ts['total_flow'].abs().max()
peak_weir_flow = structure_ts['weir_flow'].abs().max()
max_hw = structure_ts['hw'].max()
max_tw = structure_ts['tw'].max()

summary = pd.DataFrame([{
    "Connection": TARGET_CONNECTION,
    "Time Steps": len(structure_ts),
    "Peak Total Flow (cfs)": f"{peak_total_flow:,.1f}",
    "Peak Weir Flow (cfs)": f"{peak_weir_flow:,.1f}",
    "Max Headwater (ft)": f"{max_hw:.2f}",
    "Max Tailwater (ft)": f"{max_tw:.2f}",
}])
display(summary)

# Show first and last few time steps
print("\nFirst 5 time steps:")
display(structure_ts.head())
print("\nLast 5 time steps:")
display(structure_ts.tail())

assert peak_total_flow > 0, "No flow through connection — structure did not participate"
print(f"\n{'='*70}")
print(f"STRUCTURE VALIDATION: '{TARGET_CONNECTION}' connection carried flow in simulation")
print(f"  Peak total flow: {peak_total_flow:,.1f} cfs")
print(f"  Peak weir flow:  {peak_weir_flow:,.1f} cfs")
print(f"  Max headwater:   {max_hw:.2f} ft")
print(f"  Max tailwater:   {max_tw:.2f} ft")
print(f"{'='*70}")
CLB Engineering Corporation  ·  LLM Forward Engineering
RAS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.