Gridded Precipitation for Rain-on-Grid 2D Modeling¶
Purpose¶
This notebook demonstrates gridded precipitation (rain-on-grid) workflows for 2D HEC-RAS models using HMS-validated precipitation methods. Unlike hydrograph boundary conditions, gridded precipitation applies directly to mesh cells.
Scope: Gridded precipitation for 2D rain-on-grid models
For hydrograph boundary condition workflows, see:
- 721_atlas14_comprehensive_workflow.ipynb - Complete hydrograph BC workflow with bulk execution
Workflow Overview¶
- Project Setup - Extract BaldEagleCrkMulti2D project (configured for gridded precipitation)
- Mesh Analysis - Extract and visualize 2D mesh cells that receive precipitation
- Storm Generation - Generate uniform storms using all HMS-validated methods
- Spatial Variance Assessment - Assess whether uniform rainfall is appropriate
- Gridded Precipitation Concept - Future conversion workflow (placeholder)
- Execution - Optional plan execution with results extraction
Key Difference: Gridded vs Hydrograph Precipitation¶
| Aspect | Hydrograph BC (Notebook 721) | Gridded Precipitation (This Notebook) |
|---|---|---|
| Input Type | Time series at boundary | Spatially distributed over mesh |
| Application | Upstream/downstream BCs | Rain-on-grid 2D modeling |
| Project | Davis (1D inflows) | BaldEagleCrkMulti2D (2D mesh) |
| Spatial Variance | Not applicable | Critical consideration |
Prerequisites¶
- HEC-RAS 6.5+ installed (for plan execution)
- hms-commander package (for HMS-equivalent methods)
- BaldEagleCrkMulti2D example project (included with ras-commander)
Related Notebooks¶
720_precipitation_methods_comprehensive.ipynb- Method comparison (no execution)721_atlas14_comprehensive_workflow.ipynb- Hydrograph BC workflow725_atlas14_spatial_variance.ipynb- Spatial variance analysis (detailed)
Part 1: Setup and Imports¶
# =============================================================================
# DEVELOPMENT MODE TOGGLE
# =============================================================================
USE_LOCAL_SOURCE = True # Set to True for local development, False for pip package
if USE_LOCAL_SOURCE:
import sys
from pathlib import Path
local_path = str(Path.cwd().parent)
if local_path not in sys.path:
sys.path.insert(0, local_path)
print(f"LOCAL SOURCE MODE: Loading from {local_path}/ras_commander")
else:
print("PIP PACKAGE MODE: Loading installed ras-commander")
# Import ras-commander core
from ras_commander import (
RasExamples,
init_ras_project,
ras,
RasCmdr,
RasPlan,
RasPrj
)
from ras_commander.hdf import HdfResultsPlan, HdfMesh
# Import precipitation modules
from ras_commander.precip import (
StormGenerator, # Alternating Block Method (NOT HMS-equivalent)
Atlas14Storm, # HMS-equivalent Atlas 14 temporal distributions
FrequencyStorm, # HMS-equivalent TP-40 temporal pattern
ScsTypeStorm, # HMS-equivalent SCS Type I/IA/II/III
Atlas14Grid, # Gridded PFE access
Atlas14Variance, # Spatial variance analysis
ATLAS14_AVAILABLE, # Availability flag for Atlas14Storm
FREQUENCY_STORM_AVAILABLE, # Availability flag for FrequencyStorm
SCS_TYPE_AVAILABLE # Availability flag for ScsTypeStorm
)
import ras_commander
print(f"Loaded ras_commander: {ras_commander.__file__}")
# Check HMS-equivalent method availability
print(f"\nHMS-Equivalent Methods Availability:")
print(f" Atlas14Storm: {'[OK]' if ATLAS14_AVAILABLE else '[--]'}")
print(f" FrequencyStorm: {'[OK]' if FREQUENCY_STORM_AVAILABLE else '[--]'}")
print(f" ScsTypeStorm: {'[OK]' if SCS_TYPE_AVAILABLE else '[--]'}")
if not (ATLAS14_AVAILABLE and FREQUENCY_STORM_AVAILABLE and SCS_TYPE_AVAILABLE):
print("\n Install hms-commander for all HMS-equivalent methods:")
print(" pip install hms-commander")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
# Configure output
pd.set_option('display.max_rows', 50)
pd.set_option('display.float_format', '{:.4f}'.format)
print("Standard libraries imported successfully")
# =============================================================================
# CONFIGURATION - Edit these for your project
# =============================================================================
# Project Configuration
PROJECT_NAME = "BaldEagleCrkMulti2D" # Example project with 2D mesh
RAS_VERSION = "7.0" # HEC-RAS version
TEMPLATE_PLAN = "06" # Plan 06 is configured for gridded precipitation
# Storm Parameters
STORM_DURATION_HOURS = 24 # 24-hour storm
TIME_INTERVAL_MIN = 60 # 1-hour intervals for output
# Demonstration AEP (single event for this notebook)
# Using 100-year event as representative design storm
DEMO_AEP_PERCENT = 1.0 # 1% AEP = 100-year event
DEMO_AEP_NAME = "100yr"
# Execution Control
# WARNING: Set to True to execute HEC-RAS Plan 06
EXECUTE_PLAN = True # Set to True to run HEC-RAS
NUM_CORES = 2 # CPU cores for execution
# Spatial Variance Threshold
# If variance exceeds this threshold, spatially variable rainfall should be considered
VARIANCE_THRESHOLD_PCT = 10.0 # Percent range threshold
print("Configuration Summary:")
print(f" Project: {PROJECT_NAME}")
print(f" Plan: {TEMPLATE_PLAN} (configured for gridded precipitation)")
print(f" Storm Duration: {STORM_DURATION_HOURS} hours")
print(f" Demo AEP: {DEMO_AEP_NAME} ({DEMO_AEP_PERCENT}% AEP)")
print(f" Execute Plan: {EXECUTE_PLAN}")
print(f" Variance Threshold: {VARIANCE_THRESHOLD_PCT}%")
# =============================================================================
# 1.1 Extract Example Project
# =============================================================================
print("Extracting BaldEagleCrkMulti2D example project...")
try:
# Extract with suffix to avoid conflicts
project_path = RasExamples.extract_project(PROJECT_NAME, suffix="722_gridded")
print(f"[OK] Extracted to: {project_path}")
# Verify path exists
if not project_path.exists():
raise FileNotFoundError(f"Project not found at {project_path}")
PROJECT_AVAILABLE = True
except Exception as e:
print(f"[!!] Error extracting project: {e}")
PROJECT_AVAILABLE = False
project_path = None
# =============================================================================
# 1.2 Initialize Project
# =============================================================================
if PROJECT_AVAILABLE:
print("Initializing HEC-RAS project...")
init_ras_project(project_path, RAS_VERSION)
print(f"\n[OK] Project initialized: {ras.project_name}")
print(f" Project folder: {ras.project_folder}")
print(f" Plans available: {len(ras.plan_df)}")
print(f" Geometries available: {len(ras.geom_df)}")
# Display available plans
print("\nAvailable Plans:")
display(ras.plan_df[['plan_number', 'Plan Title', 'Geom File', 'Flow File', 'HDF_Results_Path']])
else:
print("[!!] Project not available - cannot continue")
# =============================================================================
# 1.3 Verify Gridded Precipitation Configuration
# =============================================================================
if PROJECT_AVAILABLE:
print(f"Verifying Plan {TEMPLATE_PLAN} for gridded precipitation...")
# Check if plan exists
plan_row = ras.plan_df[ras.plan_df['plan_number'] == TEMPLATE_PLAN]
if len(plan_row) > 0:
print(f"\n[OK] Plan {TEMPLATE_PLAN} found:")
print(f" Title: {plan_row['Plan Title'].values[0]}")
print(f" Geometry: {plan_row['Geom File'].values[0]}")
print(f" Flow: {plan_row['Flow File'].values[0]}")
PLAN_AVAILABLE = True
else:
print(f"\n[!!] Plan {TEMPLATE_PLAN} not found!")
print(f" Available plans: {ras.plan_df['plan_number'].tolist()}")
PLAN_AVAILABLE = False
# Find geometry HDF
geom_hdfs = list(project_path.glob("*.g*.hdf"))
if geom_hdfs:
geom_hdf = geom_hdfs[0]
print(f"\n[OK] Geometry HDF: {geom_hdf.name}")
else:
print("\n[!!] No geometry HDF found")
geom_hdf = None
else:
PLAN_AVAILABLE = False
geom_hdf = None
Part 2: Mesh Analysis¶
Extract and visualize the 2D mesh cells that will receive precipitation. Each mesh cell in a rain-on-grid model receives precipitation input independently.
# =============================================================================
# 2.1 Get 2D Mesh Area Names
# =============================================================================
if PROJECT_AVAILABLE and geom_hdf:
print("Extracting 2D mesh area names...")
mesh_area_names = HdfMesh.get_mesh_area_names(geom_hdf)
if mesh_area_names:
print(f"\n[OK] Found {len(mesh_area_names)} 2D flow area(s):")
for name in mesh_area_names:
print(f" - {name}")
else:
print("\n[!!] No 2D flow areas found in geometry")
else:
print("[!!] Project or geometry HDF not available")
mesh_area_names = []
# =============================================================================
# 2.2 Extract Mesh Cell Points
# =============================================================================
if PROJECT_AVAILABLE and geom_hdf and mesh_area_names:
print("Extracting mesh cell center points...")
try:
mesh_cells_gdf = HdfMesh.get_mesh_cell_points(geom_hdf)
print(f"\n[OK] Extracted {len(mesh_cells_gdf)} mesh cell center points")
print(f"\nMesh cell statistics:")
print(f" Total cells: {len(mesh_cells_gdf)}")
# Show by mesh area
if 'mesh_name' in mesh_cells_gdf.columns:
print(f"\nCells by 2D flow area:")
cell_counts = mesh_cells_gdf.groupby('mesh_name').size()
for name, count in cell_counts.items():
print(f" {name}: {count:,} cells")
# Show sample
print(f"\nSample cells:")
display(mesh_cells_gdf.head())
except Exception as e:
print(f"[!!] Error extracting mesh cells: {e}")
mesh_cells_gdf = None
else:
print("[!!] Cannot extract mesh cells - prerequisites not available")
mesh_cells_gdf = None
# =============================================================================
# 2.3 Extract Mesh Area Perimeters
# =============================================================================
if PROJECT_AVAILABLE and geom_hdf:
print("Extracting 2D flow area perimeters...")
try:
mesh_areas_gdf = HdfMesh.get_mesh_areas(geom_hdf)
if len(mesh_areas_gdf) > 0:
print(f"\n[OK] Extracted {len(mesh_areas_gdf)} 2D flow area perimeter(s)")
# Calculate area in square miles (approximate)
# Note: This assumes projected CRS - actual calculation depends on projection
print(f"\nFlow area bounds:")
bounds = mesh_areas_gdf.total_bounds
print(f" Min X: {bounds[0]:.2f}")
print(f" Min Y: {bounds[1]:.2f}")
print(f" Max X: {bounds[2]:.2f}")
print(f" Max Y: {bounds[3]:.2f}")
else:
print("\n[!!] No mesh area perimeters found")
except Exception as e:
print(f"[!!] Error extracting mesh areas: {e}")
mesh_areas_gdf = None
else:
mesh_areas_gdf = None
# =============================================================================
# 2.4 Visualize Mesh Domain
# =============================================================================
if mesh_cells_gdf is not None and mesh_areas_gdf is not None:
print("Visualizing mesh domain...")
fig, ax = plt.subplots(figsize=(12, 8))
# Plot mesh area perimeters
mesh_areas_gdf.plot(ax=ax, facecolor='lightblue', edgecolor='blue',
linewidth=2, alpha=0.3)
# Plot mesh cell points (sample if too many)
n_cells = len(mesh_cells_gdf)
if n_cells > 5000:
# Sample for visualization
sample_gdf = mesh_cells_gdf.sample(n=5000, random_state=42)
sample_note = f" (5,000 sample of {n_cells:,})"
else:
sample_gdf = mesh_cells_gdf
sample_note = ""
sample_gdf.plot(ax=ax, markersize=1, color='red', alpha=0.5)
# Create custom legend handles (geopandas plot doesn't support 'label' well)
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
legend_elements = [
Patch(facecolor='lightblue', edgecolor='blue', alpha=0.3, label='2D Flow Area'),
Line2D([0], [0], marker='o', color='w', markerfacecolor='red',
markersize=5, alpha=0.5, label='Cell Centers')
]
ax.legend(handles=legend_elements, loc='upper right')
ax.set_xlabel('X Coordinate', fontsize=11)
ax.set_ylabel('Y Coordinate', fontsize=11)
title_str = (f'BaldEagleCrkMulti2D - 2D Mesh Domain\n'
f'{n_cells:,} Mesh Cells{sample_note}')
ax.set_title(title_str,
fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("[OK] Mesh domain visualized")
print(f"Note: In rain-on-grid modeling, each of these {n_cells:,} cells")
print(" receives precipitation input independently.")
else:
print("[--] Cannot visualize - mesh data not available")
Part 3: Generate Uniform Storms (All HMS Methods)¶
For demonstration purposes, we generate uniform (spatially invariant) storms. In uniform rainfall, every mesh cell receives the same precipitation depth over time.
This notebook generates representative storms using all 4 available HMS methods: 1. Atlas14Storm - 2 quartiles (All Cases, Fourth Quartile) 2. FrequencyStorm - TP-40 pattern 3. ScsTypeStorm - Type II (standard for most of US)
Note: These are uniform storms - the depth is the same everywhere. Part 4 assesses whether spatial variation is needed.
# =============================================================================
# 3.1 Get Representative Precipitation Depth
# =============================================================================
if PROJECT_AVAILABLE and geom_hdf:
print("Getting representative precipitation depth for model domain...")
try:
# Use Atlas14Variance to get representative depth
quick_stats = Atlas14Variance.analyze_quick(
geom_hdf=geom_hdf,
duration=STORM_DURATION_HOURS,
return_period=int(100 / DEMO_AEP_PERCENT) # Convert AEP to return period
)
# Use mean depth for uniform rainfall
TOTAL_DEPTH_INCHES = quick_stats['mean']
print(f"\n[OK] Atlas 14 PFE for project domain ({DEMO_AEP_NAME}, {STORM_DURATION_HOURS}-hr):")
print(f" Min: {quick_stats['min']:.2f} inches")
print(f" Max: {quick_stats['max']:.2f} inches")
print(f" Mean: {quick_stats['mean']:.2f} inches (using for uniform rainfall)")
print(f" Range: {quick_stats['range']:.2f} inches ({quick_stats['range_pct']:.1f}%)")
DEPTH_AVAILABLE = True
except Exception as e:
print(f"[!!] Error getting precipitation depth: {e}")
print("\nUsing default depth for demonstration...")
TOTAL_DEPTH_INCHES = 5.75 # Approximate for Pennsylvania 100-yr 24-hr
DEPTH_AVAILABLE = False
print(f"\nUsing total depth: {TOTAL_DEPTH_INCHES:.2f} inches")
else:
print("[!!] Project not available")
TOTAL_DEPTH_INCHES = 5.75
DEPTH_AVAILABLE = False
# =============================================================================
# 3.2 Generate Storms Using All Available Storm Methods
# =============================================================================
print("="*80)
print(f"GENERATING UNIFORM STORM SUITE ({DEMO_AEP_NAME}, {STORM_DURATION_HOURS}-hr)")
print("="*80)
print(f"\nTotal Depth: {TOTAL_DEPTH_INCHES:.2f} inches")
storms_suite = {}
# ----- Atlas14Storm: 2 Quartiles -----
if ATLAS14_AVAILABLE:
print("\n Atlas14Storm (HMS-equivalent, NOAA patterns):")
# Determine state and region from project location
# BaldEagleCrkMulti2D is in Pennsylvania (PA), Region 1
STATE = "pa" # Pennsylvania
REGION = 1 # Atlas 14 Region 1
quartiles = ["All Cases", "Fourth Quartile"]
for quartile in quartiles:
short_name = quartile.replace(" ", "").replace("Quartile", "Q")
key = f"Atlas14_{short_name}"
try:
hyeto = Atlas14Storm.generate_hyetograph(
total_depth_inches=TOTAL_DEPTH_INCHES,
state=STATE,
region=REGION,
duration_hours=STORM_DURATION_HOURS,
aep_percent=DEMO_AEP_PERCENT,
quartile=quartile
)
storms_suite[key] = hyeto
print(f" [OK] {key}: {len(hyeto)} steps, {hyeto['incremental_depth'].sum():.6f} in")
except Exception as e:
print(f" [!!] {key}: Error - {e}")
else:
print("\n [--] Atlas14Storm not available")
# ----- FrequencyStorm: TP-40 -----
if FREQUENCY_STORM_AVAILABLE:
print("\n FrequencyStorm (HMS-equivalent, TP-40):")
key = "FrequencyStorm_TP40"
try:
hyeto = FrequencyStorm.generate_hyetograph(
total_depth_inches=TOTAL_DEPTH_INCHES,
total_duration_min=STORM_DURATION_HOURS * 60,
time_interval_min=TIME_INTERVAL_MIN,
peak_position_pct=67.0 # TP-40 standard
)
storms_suite[key] = hyeto
print(f" [OK] {key}: {len(hyeto)} steps, {hyeto['incremental_depth'].sum():.6f} in")
except Exception as e:
print(f" [!!] {key}: Error - {e}")
else:
print("\n [--] FrequencyStorm not available")
# ----- ScsTypeStorm: Type II -----
if SCS_TYPE_AVAILABLE:
print("\n ScsTypeStorm (HMS-equivalent, TR-55):")
key = "ScsType_II"
try:
hyeto = ScsTypeStorm.generate_hyetograph(
total_depth_inches=TOTAL_DEPTH_INCHES,
scs_type='II',
time_interval_min=TIME_INTERVAL_MIN
)
storms_suite[key] = hyeto
print(f" [OK] {key}: {len(hyeto)} steps, {hyeto['incremental_depth'].sum():.6f} in")
except Exception as e:
print(f" [!!] {key}: Error - {e}")
else:
print("\n [--] ScsTypeStorm not available")
# Summary
print("\n" + "="*80)
print(f"STORM GENERATION COMPLETE: {len(storms_suite)} storms generated")
print("="*80)
# =============================================================================
# 3.3 Visualize Generated Storms
# =============================================================================
if storms_suite:
n_storms = len(storms_suite)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Plot 1: Incremental Depth (Hyetographs)
ax1 = axes[0]
colors = ['steelblue', 'darkgreen', 'darkorange', 'darkred']
for idx, (name, hyeto) in enumerate(storms_suite.items()):
color = colors[idx % len(colors)]
# Determine time axis based on hyetograph length
if 'Atlas14' in name:
# Atlas14 uses 30-min intervals
time_hours = np.arange(len(hyeto)) * 0.5
else:
# FrequencyStorm and ScsType use TIME_INTERVAL_MIN
time_hours = np.arange(len(hyeto)) * TIME_INTERVAL_MIN / 60
ax1.plot(time_hours, hyeto['incremental_depth'], linewidth=2, label=name, color=color, alpha=0.8)
ax1.set_xlabel('Time (hours)', fontsize=11)
ax1.set_ylabel('Incremental Depth (inches)', fontsize=11)
ax1.set_title('Hyetograph Comparison (Incremental)', fontsize=12, fontweight='bold')
ax1.legend(loc='upper right', fontsize=9)
ax1.grid(True, alpha=0.3)
# Plot 2: Cumulative Depth
ax2 = axes[1]
for idx, (name, hyeto) in enumerate(storms_suite.items()):
color = colors[idx % len(colors)]
if 'Atlas14' in name:
time_hours = np.arange(len(hyeto)) * 0.5
else:
time_hours = np.arange(len(hyeto)) * TIME_INTERVAL_MIN / 60
ax2.plot(time_hours, hyeto['cumulative_depth'], linewidth=2, label=name, color=color, alpha=0.8)
ax2.axhline(TOTAL_DEPTH_INCHES, color='black', linestyle='--', linewidth=1.5,
label=f'Target: {TOTAL_DEPTH_INCHES:.2f} in')
ax2.set_xlabel('Time (hours)', fontsize=11)
ax2.set_ylabel('Cumulative Depth (inches)', fontsize=11)
ax2.set_title('Cumulative Depth Comparison', fontsize=12, fontweight='bold')
ax2.legend(loc='lower right', fontsize=9)
ax2.grid(True, alpha=0.3)
plt.suptitle(f'{DEMO_AEP_NAME}, {STORM_DURATION_HOURS}-Hour Storm Suite\nUniform Rainfall: {TOTAL_DEPTH_INCHES:.2f} inches',
fontsize=13, fontweight='bold', y=1.02)
plt.tight_layout()
plt.show()
# Print depth conservation summary
print("\nDepth Conservation Summary:")
print("-" * 60)
print(f"{'Storm Type':<25s} {'Total Depth':>15s} {'Error':>15s}")
print("-" * 60)
for name, hyeto in storms_suite.items():
total = hyeto['incremental_depth'].sum()
error = abs(total - TOTAL_DEPTH_INCHES)
print(f"{name:<25s} {total:>15.6f} {error:>15.9f}")
print("-" * 60)
print("\nNote: HMS-equivalent methods conserve depth at 10^-6 precision.")
else:
print("[--] No storms generated")
Part 4: Spatial Variance Assessment¶
Before applying uniform rainfall to all mesh cells, we should assess whether spatial variation exists in the Atlas 14 precipitation data across the model domain.
Decision Criterion: - If range percentage < 10%: Uniform rainfall is appropriate - If range percentage >= 10%: Consider spatially variable rainfall
# =============================================================================
# 4.1 Full Variance Analysis
# =============================================================================
if PROJECT_AVAILABLE and geom_hdf:
print("Running spatial variance analysis...")
try:
# Analyze across multiple durations and return periods
variance_results = Atlas14Variance.analyze(
geom_hdf=geom_hdf,
durations=[6, 12, 24],
return_periods=[10, 50, 100],
extent_source="2d_flow_area",
variance_denominator='min'
)
print("\n[OK] Variance analysis complete")
print("\nVariance Statistics:")
display(variance_results[['mesh_area', 'duration_hr', 'return_period_yr',
'min_inches', 'max_inches', 'mean_inches', 'range_pct']])
VARIANCE_AVAILABLE = True
except Exception as e:
print(f"[!!] Error in variance analysis: {e}")
variance_results = None
VARIANCE_AVAILABLE = False
else:
print("[!!] Cannot run variance analysis - project not available")
variance_results = None
VARIANCE_AVAILABLE = False
# =============================================================================
# 4.2 Uniformity Decision
# =============================================================================
if VARIANCE_AVAILABLE and variance_results is not None:
print("Assessing whether uniform rainfall is appropriate...")
# Check uniformity
ok, message = Atlas14Variance.is_uniform_rainfall_appropriate(
variance_results,
threshold_pct=VARIANCE_THRESHOLD_PCT
)
print(f"\n{'[OK]' if ok else '[!!]'} {message}")
# Show max variance event
max_var_row = variance_results.loc[variance_results['range_pct'].idxmax()]
print(f"\nHighest variance event:")
print(f" Duration: {int(max_var_row['duration_hr'])}-hour")
print(f" Return Period: {int(max_var_row['return_period_yr'])}-year")
print(f" Range: {max_var_row['range_pct']:.1f}%")
print(f" Min: {max_var_row['min_inches']:.2f} in, Max: {max_var_row['max_inches']:.2f} in")
UNIFORM_APPROPRIATE = ok
else:
print("[--] Variance analysis not available")
print("\nDefaulting to uniform rainfall assumption.")
UNIFORM_APPROPRIATE = True
# =============================================================================
# 4.3 Visualize Variance by Duration
# =============================================================================
if VARIANCE_AVAILABLE and variance_results is not None:
fig, ax = plt.subplots(figsize=(10, 6))
# Group by return period
durations = sorted(variance_results['duration_hr'].unique())
return_periods = sorted(variance_results['return_period_yr'].unique())
x = np.arange(len(durations))
width = 0.25
colors = ['steelblue', 'darkgreen', 'darkorange']
for i, rp in enumerate(return_periods):
data = variance_results[variance_results['return_period_yr'] == rp]
data_sorted = data.sort_values('duration_hr')
offset = (i - len(return_periods)/2 + 0.5) * width
ax.bar(x + offset, data_sorted['range_pct'], width,
label=f'{int(rp)}-yr', color=colors[i % len(colors)], alpha=0.8)
# Threshold line
ax.axhline(VARIANCE_THRESHOLD_PCT, color='red', linestyle='--',
linewidth=2, label=f'Threshold ({VARIANCE_THRESHOLD_PCT}%)')
ax.set_xlabel('Duration (hours)', fontsize=11)
ax.set_ylabel('Spatial Variance (% range)', fontsize=11)
ax.set_title('Atlas 14 Spatial Variance by Duration and Return Period',
fontsize=12, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels([f'{int(d)}-hr' for d in durations])
ax.legend(title='Return Period', fontsize=9)
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()
# Interpretation
if UNIFORM_APPROPRIATE:
print("\nInterpretation: Spatial variance is low (< threshold).")
print(" Uniform rainfall is appropriate for this model domain.")
else:
print("\nInterpretation: Spatial variance exceeds threshold.")
print(" Consider spatially variable rainfall for more accurate results.")
print(" See Part 5 for future spatially distributed rainfall workflows.")
Part 5: Atlas14 Gridded Integration (In Progress)¶
This section focuses on direct Atlas14 gridded precipitation for HEC-RAS rain-on-grid models using AORC-style NetCDF input.
Current Status: Direct Atlas14 -> SHG NetCDF integration is in progress (see ROADMAP.md). Hyetograph <-> gridded conversion utilities are not planned.
Direct Integration Vision¶
When implemented, the workflow will be:
- Fetch Atlas14 PFE grid for the project extent (WGS84)
- Apply HMS-equivalent temporal distribution to build a time-series grid
- Reproject to SHG (EPSG:5070) at native resolution or finer using nearest neighbor
- Write NetCDF in HEC-RAS-compatible format (APCP_surface, time/y/x)
- Configure unsteady file with GDAL gridded precipitation
- Execute and compare results (gridded vs uniform)
# =============================================================================
# 5.1 Gridded Precipitation Workflow Concept
# =============================================================================
print("="*80)
print("GRIDDED PRECIPITATION WORKFLOW - CONCEPT")
print("="*80)
print("""
CURRENT CAPABILITIES:
----------------------
[OK] Generate HMS-equivalent hyetographs (Atlas14Storm, FrequencyStorm, ScsTypeStorm)
[OK] Extract mesh cell locations (HdfMesh.get_mesh_cell_points)
[OK] Assess spatial variance (Atlas14Variance.analyze)
[OK] Determine if uniform rainfall is appropriate
DIRECT INTEGRATION ROADMAP (Planned - see ROADMAP.md):
------------------------------------------------------
[--] Export Atlas 14 gridded AEP storms to HEC-RAS-ready NetCDF
- Source: Atlas14Grid depth grids (pf_###_hr, inches)
- Apply temporal distribution to build time-series per grid cell
- Reproject to SHG (EPSG:5070) at native resolution or smaller
- Use nearest-neighbor resampling to minimize smoothing
[--] Configure unsteady file for GDAL Raster File(s)
- RasUnsteady.set_gridded_precipitation(...)
NOT PLANNED:
------------
[--] Hyetograph <-> gridded conversion utilities
""")
print("\nPLACEHOLDER CODE (Future Implementation):")
print("-" * 50)
# =============================================================================
# 5.2 Future Workflow (Placeholder - Not Yet Implemented)
# =============================================================================
# This code block shows what the future workflow WILL look like
# when direct Atlas 14 gridded export is implemented.
future_workflow = '''
# ==================== FUTURE IMPLEMENTATION ====================
# Direct Atlas 14 gridded NetCDF export (AORC-compatible)
from ras_commander.precip import Atlas14Grid, Atlas14Storm
from ras_commander import RasUnsteady
# Step 1: Fetch Atlas 14 depth grid for the project extent (WGS84)
pfe = Atlas14Grid.get_pfe_from_project(
geom_hdf=geom_hdf,
durations=[STORM_DURATION_HOURS],
return_periods=[int(100 / DEMO_AEP_PERCENT)]
)
# Step 2: Build temporal distribution (HMS-equivalent)
hyeto = Atlas14Storm.generate_hyetograph(
total_depth_inches=TOTAL_DEPTH_INCHES,
state="pa",
region=1,
aep_percent=DEMO_AEP_PERCENT
)
# Step 3: Convert depth grid + hyetograph to time-series grid
# (planned helper; outputs NetCDF with dims time, y, x and APCP_surface variable)
# netcdf_path = Atlas14Grid.export_gridded_aep_netcdf(
# pfe_data=pfe,
# hyetograph=hyeto,
# target_crs="EPSG:5070",
# target_resolution_m=800.0, # native or finer
# resampling="nearest"
# )
# Step 4: Configure unsteady file to use GDAL Raster File(s)
# RasUnsteady.set_gridded_precipitation(
# unsteady_file="07",
# netcdf_path=netcdf_path,
# interpolation="Nearest"
# )
'''
print(future_workflow)
# =============================================================================
# 5.3 What We CAN Do Now
# =============================================================================
print("CURRENT STATE SUMMARY")
print("="*60)
if storms_suite and mesh_cells_gdf is not None:
n_cells = len(mesh_cells_gdf)
n_storms = len(storms_suite)
print(f"\nMesh cells identified: {n_cells:,}")
print(f"Storms generated: {n_storms}")
print(f"Spatial variance assessed: {'Yes' if VARIANCE_AVAILABLE else 'No'}")
print(f"Uniform rainfall appropriate: {'Yes' if UNIFORM_APPROPRIATE else 'No'}")
print(f"\nGenerated Storms:")
for name, hyeto in storms_suite.items():
print(f" - {name}: {hyeto['incremental_depth'].sum():.4f} inches total")
print(f"\nNext Steps (Manual Workflow):")
print(f" 1. Build Atlas14 gridded NetCDF (SHG, native resolution)")
print(f" 2. Align NetCDF time axis to plan simulation window")
print(f" 3. Configure unsteady file for GDAL gridded precip")
print(f" 4. Execute plan in HEC-RAS GUI or via RasCmdr")
print(f" 5. Review results and compute messages")
else:
print("\n[!!] Setup incomplete - please run Parts 1-4")
# =============================================================================
# 5.4 Create Uniform Precipitation Plan for Comparison
# =============================================================================
# Strategy: Extract the NEXRAD area-average temporal pattern from Plan 06's
# HDF results, then REDUCE it by 10% to create a deliberately different
# uniform hyetograph. The 10% reduction serves two purposes:
# 1. It PROVES the uniform hyetograph is actually used by HEC-RAS (if it
# were ignored, WSE would be identical to Plan 06).
# 2. Over a small domain with <10% spatial variance, gridded vs uniform
# precipitation alone produces negligible WSE differences -- the 10%
# reduction guarantees a measurable, interpretable signal.
#
# If Plan 06 hasn't been executed yet, this cell runs it first so that
# p06.hdf exists for area-average extraction.
print("="*80)
print("CREATING UNIFORM PRECIPITATION PLAN FOR COMPARISON")
print("="*80)
if not PROJECT_AVAILABLE:
print("[--] Project not available")
COMPARISON_PLAN_CREATED = False
else:
try:
import h5py
from ras_commander import RasPlan, RasUnsteady
# -----------------------------------------------------------------
# Step 1: Ensure Plan 06 HDF exists (execute if needed)
# -----------------------------------------------------------------
p06_hdf = project_path / f"{ras.project_name}.p{TEMPLATE_PLAN}.hdf"
if not p06_hdf.exists():
print(f" [..] Plan {TEMPLATE_PLAN} HDF not found -- executing now...")
import time
t0 = time.time()
RasCmdr.compute_plan(TEMPLATE_PLAN, num_cores=NUM_CORES, ras_object=ras)
elapsed = time.time() - t0
print(f" [OK] Plan {TEMPLATE_PLAN} executed in {elapsed:.1f}s")
# Refresh project state
from ras_commander import init_ras_project
ras = init_ras_project(str(project_path), RAS_VERSION)
p06_hdf = project_path / f"{ras.project_name}.p{TEMPLATE_PLAN}.hdf"
if not p06_hdf.exists():
raise FileNotFoundError(
f"Plan {TEMPLATE_PLAN} execution did not produce HDF"
)
else:
print(f" [OK] Found existing Plan {TEMPLATE_PLAN} HDF: {p06_hdf.name}")
# -----------------------------------------------------------------
# Step 2: Extract NEXRAD area-average precipitation from HDF
# -----------------------------------------------------------------
flow_area_name = mesh_area_names[0] if mesh_area_names else "BaldEagleCr"
with h5py.File(p06_hdf, 'r') as f:
precip_base = 'Event Conditions/Meteorology/Precipitation'
# Cell indexes map mesh cells to NEXRAD grid cell columns
cell_idx_path = f'{precip_base}/2D Flow Areas/{flow_area_name}/Cell Indexes'
cell_indexes = f[cell_idx_path][:]
unique_nexrad_cells = np.unique(cell_indexes)
# Precipitation values: (timesteps x nexrad_cells)
# Data Type = "per-cum" means each value is the depth DURING that
# period (period-cumulative), i.e., already incremental.
precip_values = f[f'{precip_base}/Values'][:]
# Filter to NEXRAD cells overlapping the 2D flow area
area_precip = precip_values[:, unique_nexrad_cells]
# Area-average across cells for each timestep
# These ARE incremental depths (per-cum = depth per period)
area_avg_incremental = np.nanmean(area_precip, axis=1)
n_timesteps = len(area_avg_incremental)
print(f" [OK] Extracted NEXRAD area-average from {p06_hdf.name}")
print(f" {n_timesteps} timesteps, "
f"{len(unique_nexrad_cells)} NEXRAD cells in flow area")
# -----------------------------------------------------------------
# Step 3: Build hyetograph from per-cum (already incremental) data
# then reduce by 10% to create a measurable difference
# -----------------------------------------------------------------
incremental_raw = np.maximum(area_avg_incremental, 0.0) # clamp FP noise
# Report the original (unreduced) depth for reference
original_depth = np.sum(incremental_raw)
print(f" Original area-avg depth: {original_depth:.2f} inches")
# Apply 10% reduction -- this guarantees a visible WSE difference
# so we can confirm the uniform hyetograph is actually driving Plan 07.
REDUCTION_FACTOR = 0.90
incremental = incremental_raw * REDUCTION_FACTOR
cumulative = np.cumsum(incremental)
total_depth = cumulative[-1]
peak_rate = incremental.max()
peak_hour = int(np.argmax(incremental)) + 1
print(f" Reduced depth (x{REDUCTION_FACTOR}): {total_depth:.2f} inches ")
print(f" Reduction: {(1 - REDUCTION_FACTOR)*100:.0f}% less precipitation than gridded plan")
print(f" Peak rate: {peak_rate:.3f} in/hr at hour {peak_hour}")
# Build hyetograph DataFrame
hours = np.arange(1, n_timesteps + 1, dtype=float)
hyeto_df = pd.DataFrame({
'hour': hours,
'incremental_depth': incremental,
'cumulative_depth': cumulative
})
# -----------------------------------------------------------------
# Step 4: Clone Plan 06 -> Plan 07
# -----------------------------------------------------------------
new_plan = RasPlan.clone_plan(
TEMPLATE_PLAN,
new_plan_shortid="07",
new_title=f"Uniform Precipitation - {DEMO_AEP_NAME}",
ras_object=ras
)
print(f" [OK] Cloned Plan {TEMPLATE_PLAN} -> Plan {new_plan}")
# Clone u03 (same base as gridded plan) so all BCs are identical
new_unsteady = RasPlan.clone_unsteady(
"03",
new_title="Uniform Precipitation",
ras_object=ras
)
print(f" [OK] Cloned u03 -> u{new_unsteady}")
RasPlan.set_unsteady(new_plan, new_unsteady, ras_object=ras)
unsteady_file = project_path / f"{ras.project_name}.u{new_unsteady}"
# -----------------------------------------------------------------
# Step 5: Modify unsteady file -- replace gridded precip with
# boundary-level hyetograph
# -----------------------------------------------------------------
content = unsteady_file.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
# 5a: Insert precipitation boundary before Met Point Raster Parameters
met_raster_idx = None
for i, line in enumerate(lines):
if line.startswith('Met Point Raster Parameters='):
met_raster_idx = i
break
if met_raster_idx is None:
raise ValueError(
"Could not find 'Met Point Raster Parameters=' in u03 clone"
)
precip_boundary_lines = [
(f"Boundary Location= , ,"
f" , , ,{flow_area_name:<16},"
f" , ,"
f" "),
"Interval=1HOUR",
"Precipitation Hydrograph= 0 ",
]
for j, pline in enumerate(precip_boundary_lines):
lines.insert(met_raster_idx + j, pline)
# 5b: Disable Met BC gridded precipitation (prevent dual-location conflict)
remove_prefixes = [
'Met BC=Precipitation|Mode=',
'Met BC=Precipitation|Expanded View=',
'Met BC=Precipitation|Constant Units=',
'Met BC=Precipitation|Gridded Source=',
'Met BC=Precipitation|Gridded DSS Filename=',
'Met BC=Precipitation|Gridded DSS Pathname=',
]
new_lines = []
for line in lines:
if line.startswith('Precipitation Mode=Enable'):
new_lines.append('Precipitation Mode=')
elif any(line.startswith(prefix) for prefix in remove_prefixes):
continue
else:
new_lines.append(line)
unsteady_file.write_text('\n'.join(new_lines), encoding='utf-8')
print(f" [OK] Modified {unsteady_file.name}:")
print(f" Disabled Met BC gridded precipitation")
print(f" Added boundary-level precipitation placeholder")
# -----------------------------------------------------------------
# Step 6: Write NEXRAD area-average hyetograph via RasUnsteady API
# -----------------------------------------------------------------
RasUnsteady.set_precipitation_hyetograph(
str(unsteady_file), hyeto_df, boundary_name=flow_area_name
)
print(f" [OK] Applied NEXRAD area-average hyetograph:")
print(f" {len(hyeto_df)} hourly steps, {total_depth:.2f} in total")
COMPARISON_PLAN_CREATED = True
UNIFORM_PLAN = new_plan
print(f"\n COMPARISON READY:")
print(f" Plan {TEMPLATE_PLAN}: Gridded (spatially varying) NEXRAD precipitation")
print(f" Plan {new_plan}: Uniform (area-average) at {REDUCTION_FACTOR:.0%} of gridded depth")
print(f" Differences: spatial distribution + 10% depth reduction")
print(f" The 10% reduction confirms the uniform hyetograph is used by HEC-RAS")
except Exception as e:
print(f"[!!] Error creating comparison plan: {e}")
import traceback
traceback.print_exc()
COMPARISON_PLAN_CREATED = False
Part 6: Optional Plan Execution¶
This section provides optional execution of Plan 06 if the project is properly configured for gridded precipitation.
WARNING: Set EXECUTE_PLAN = True in Part 1 to enable execution.
# =============================================================================
# 6.1 Execute Plans (Gridded + Optional Uniform Comparison)
# =============================================================================
print("="*80)
print("PLAN EXECUTION")
print("="*80)
if not EXECUTE_PLAN:
print("\n[--] Execution DISABLED (EXECUTE_PLAN = True)")
print("\nTo enable execution:")
print(" 1. Set EXECUTE_PLAN = True in Part 1 configuration cell")
print(" 2. Re-run the configuration cell")
print(" 3. Re-run this cell")
EXECUTION_SUCCESS = False
elif not (PROJECT_AVAILABLE and PLAN_AVAILABLE):
print("\n[!!] Cannot execute - project or plan not available")
EXECUTION_SUCCESS = False
else:
import time
# Build list of plans to execute
plans_to_run = [TEMPLATE_PLAN]
plan_descriptions = {TEMPLATE_PLAN: "Gridded Precipitation"}
if COMPARISON_PLAN_CREATED:
plans_to_run.append(UNIFORM_PLAN)
plan_descriptions[UNIFORM_PLAN] = "Uniform Precipitation"
print(f"\nExecuting {len(plans_to_run)} plans for comparison:")
else:
print(f"\nExecuting {len(plans_to_run)} plan (comparison plan not created):")
for plan_num in plans_to_run:
print(f" - Plan {plan_num}: {plan_descriptions.get(plan_num, 'Unknown')}")
print(f"\nCores per plan: {NUM_CORES}")
print("-" * 40)
execution_results = {}
start_time = time.time()
for plan_num in plans_to_run:
plan_start = time.time()
print(f"\nExecuting Plan {plan_num} ({plan_descriptions.get(plan_num, '')})...")
try:
success = RasCmdr.compute_plan(
plan_num,
num_cores=NUM_CORES,
ras_object=ras
)
plan_elapsed = time.time() - plan_start
if success:
print(f" [OK] Plan {plan_num} completed in {plan_elapsed:.1f} seconds")
execution_results[plan_num] = True
else:
print(f" [!!] Plan {plan_num} returned False after {plan_elapsed:.1f} seconds")
execution_results[plan_num] = False
except Exception as e:
print(f" [!!] Plan {plan_num} error: {e}")
execution_results[plan_num] = False
total_elapsed = time.time() - start_time
successful = sum(execution_results.values())
print("\n" + "-" * 40)
print(f"Execution Summary: {successful}/{len(plans_to_run)} plans successful")
print(f"Total time: {total_elapsed:.1f} seconds")
EXECUTION_SUCCESS = all(execution_results.values()) and successful > 0
# Display results summary from results_df (if execution was performed)
# Shows execution status, completion, errors/warnings, and runtime for all plans
if EXECUTE_PLAN and EXECUTION_SUCCESS:
# Re-initialize from computed folder to get updated results_df
computed_path = project_path.parent / f"{project_path.name} [Computed]"
if computed_path.exists():
ras = init_ras_project(computed_path, RAS_VERSION)
print("Results summary:")
display(ras.results_df[['plan_number', 'plan_title', 'completed', 'has_errors', 'has_warnings', 'runtime_complete_process_hours']])
else:
# Use original project path
ras = init_ras_project(project_path, RAS_VERSION)
display(ras.results_df[['plan_number', 'plan_title', 'completed', 'has_errors', 'has_warnings', 'runtime_complete_process_hours']])
# =============================================================================
# 6.2 Extract Results (If Executed)
# =============================================================================
if EXECUTE_PLAN and EXECUTION_SUCCESS:
print("Extracting results from executed plans...")
from ras_commander.hdf import HdfResultsMesh
# Build list of plans to analyze
plans_to_analyze = [TEMPLATE_PLAN]
plan_labels = {TEMPLATE_PLAN: "Gridded Precipitation"}
if COMPARISON_PLAN_CREATED:
plans_to_analyze.append(UNIFORM_PLAN)
plan_labels[UNIFORM_PLAN] = "Uniform Precipitation"
for plan_num in plans_to_analyze:
try:
# Find plan HDF
plan_hdfs = list(project_path.glob(f"*.p{plan_num}.hdf"))
if plan_hdfs:
plan_hdf = plan_hdfs[0]
print(f"\n{'-'*60}")
print(f"Plan {plan_num}: {plan_labels.get(plan_num, 'Unknown')}")
print(f"HDF File: {plan_hdf.name}")
# Extract maximum WSE from 2D mesh results
max_ws_gdf = HdfResultsMesh.get_mesh_max_ws(plan_hdf)
if max_ws_gdf is not None and len(max_ws_gdf) > 0:
wse_col = [c for c in max_ws_gdf.columns if 'water_surface' in c.lower()][0]
print(f"\nMaximum Water Surface Elevation (2D Mesh):")
print(f" Count: {len(max_ws_gdf):,} cells")
print(f" Min: {max_ws_gdf[wse_col].min():.2f} ft")
print(f" Max: {max_ws_gdf[wse_col].max():.2f} ft")
print(f" Mean: {max_ws_gdf[wse_col].mean():.2f} ft")
print(f" Std: {max_ws_gdf[wse_col].std():.2f} ft")
else:
print("\n[!!] No 2D mesh WSE data found in results")
else:
print(f"\n[!!] Plan HDF not found for Plan {plan_num}")
except Exception as e:
print(f"\n[!!] Error extracting results for Plan {plan_num}: {e}")
else:
print("[--] No results to extract (execution skipped or failed)")
Part 7: Spatial Analysis Maps¶
This section provides two critical visualizations for understanding precipitation and flood response spatial patterns:
- WSE Difference Map: Shows how Atlas 14 precipitation depths vary across the project extent
- Maximum WSE Map: Shows the maximum water surface elevation across mesh cells from executed plan
Engineering Significance¶
Spatial Variance Map: - High variance (>10%) indicates spatially variable rainfall should be considered - Low variance confirms uniform rainfall is appropriate - Provides visual confirmation of variance statistics
Maximum WSE Map: - Shows flood extent and depth from the executed storm event - Enables comparison across different temporal distributions when multiple runs are performed - Critical for identifying areas of highest flood risk
# =============================================================================
# 7.0 Helper Functions for Spatial Analysis
# =============================================================================
# These functions analyze which precipitation approach produces highest max WSE
# at each mesh cell and generate visualization maps.
def analyze_dominant_method(project_path, plan_methods, ras_object=None):
"""
Analyze which precipitation method produces highest max WSE at each mesh cell.
Args:
project_path: Path to HEC-RAS project (or [Computed] folder)
plan_methods: dict of {method_name: plan_number}
ras_object: Optional RasPrj object
Returns:
GeoDataFrame with columns:
- mesh_name, cell_id, geometry (Polygon)
- dominant_method (str): Name of winning method or "No Difference"/"Ambiguous"
- max_wse (float): Highest WSE value across all methods
- delta_wse (float): Difference between 1st and 2nd highest
- {method_name}_wse (float): WSE for each method
"""
from ras_commander.hdf import HdfResultsMesh, HdfMesh
from ras_commander import init_ras_project, RasPrj
import pandas as pd
import numpy as np
method_names = list(plan_methods.keys())
n_methods = len(method_names)
TIE_TOLERANCE = 0.001 # ft
# Re-initialize from project_path to get correct HDF_Results_Path in plan_df
# This is critical after compute_parallel() which creates [Computed] folder
# Use a local RasPrj to avoid overwriting the global ras object
if ras_object is None:
ras_local = init_ras_project(
project_path,
RAS_VERSION,
ras_object=RasPrj()
)
else:
ras_local = ras_object
# Step 1: Extract max WSE from each plan using plan_df
wse_data = {}
cell_polygons = None
for method_name, plan_num in plan_methods.items():
# Get HDF path from plan_df (authoritative source)
plan_row = ras_local.plan_df[ras_local.plan_df['plan_number'] == plan_num]
if len(plan_row) == 0:
print(f" [!!] Plan {plan_num} not found in plan_df for {method_name}")
continue
hdf_path_str = plan_row['HDF_Results_Path'].iloc[0]
if hdf_path_str is None or pd.isna(hdf_path_str):
print(f" [!!] No HDF file for {method_name} (Plan {plan_num}) - not executed yet?")
continue
hdf_path = Path(hdf_path_str)
if not hdf_path.exists():
print(f" [!!] HDF file doesn't exist: {hdf_path}")
continue
# Get max WSE
max_ws_gdf = HdfResultsMesh.get_mesh_max_ws(hdf_path)
wse_col = [c for c in max_ws_gdf.columns if 'water_surface' in c.lower()][0]
wse_data[method_name] = max_ws_gdf.set_index(['mesh_name', 'cell_id'])[wse_col]
# Get polygons from first HDF (they're all the same mesh)
if cell_polygons is None:
try:
cell_polygons = HdfMesh.get_mesh_cell_polygons(hdf_path)
except:
pass
if not wse_data:
raise ValueError("No valid WSE data extracted from any plan")
# Step 2: Create comparison DataFrame
wse_df = pd.DataFrame(wse_data)
wse_df.columns = [f"{m}_wse" for m in wse_df.columns]
# Step 3: Compute dominant method for each cell
wse_values = wse_df.values # NumPy array for fast operations
method_cols = [f"{m}_wse" for m in method_names]
# Find max and argmax
max_wse = np.nanmax(wse_values, axis=1)
argmax_idx = np.nanargmax(wse_values, axis=1)
dominant = np.array(method_names)[argmax_idx]
# Find 2nd highest for delta calculation
sorted_wse = np.sort(wse_values, axis=1)[:, ::-1] # Descending
second_highest = sorted_wse[:, 1] if wse_values.shape[1] > 1 else sorted_wse[:, 0]
delta_wse = max_wse - second_highest
# Detect ties (values within tolerance of max)
is_tie = np.sum(np.abs(wse_values - max_wse[:, None]) < TIE_TOLERANCE, axis=1) > 1
# Label ties based on number of methods
tie_label = "No Difference" if n_methods == 2 else "Ambiguous"
dominant = np.where(is_tie, tie_label, dominant)
# Step 4: Build result DataFrame
result_df = wse_df.copy()
result_df['dominant_method'] = dominant
result_df['max_wse'] = max_wse
result_df['delta_wse'] = delta_wse
result_df = result_df.reset_index()
# Step 5: Merge with polygons
if cell_polygons is not None:
cell_polygons['cell_id'] = cell_polygons['cell_id'].astype(int)
result_df['cell_id'] = result_df['cell_id'].astype(int)
merged = cell_polygons.merge(
result_df,
on=['mesh_name', 'cell_id'],
how='left'
)
return gpd.GeoDataFrame(merged, geometry='geometry')
else:
# Fallback: return without polygons (scatter plot mode)
return result_df
def generate_dominant_method_figures(result_gdf, aep_name, output_folder, method_names):
"""
Generate all three figures for comparing precipitation approaches.
Saves to:
- {output_folder}/{aep}_01_dominant_approach.png
- {output_folder}/{aep}_02_max_wse.png
- {output_folder}/{aep}_03_wse_difference.png
"""
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches
output_folder = Path(output_folder)
output_folder.mkdir(parents=True, exist_ok=True)
# Sanitize AEP name for filename
aep_safe = aep_name.replace(' ', '_').replace('-', '_').lower()
# Define color palette for methods
colors = plt.cm.Set3(np.linspace(0, 1, len(method_names) + 2))
method_colors = {m: colors[i] for i, m in enumerate(method_names)}
method_colors['No Difference'] = 'lightgray'
method_colors['Ambiguous'] = 'darkgray'
# ===== FIGURE 1: Dominant Method Map =====
fig1, ax1 = plt.subplots(figsize=(12, 10))
# Map categories to numeric for plotting
categories = list(method_names) + ['No Difference', 'Ambiguous']
cat_to_num = {c: i for i, c in enumerate(categories)}
result_gdf['method_code'] = result_gdf['dominant_method'].map(cat_to_num)
# Plot
result_gdf.plot(
column='method_code',
ax=ax1,
cmap=ListedColormap([method_colors[c] for c in categories]),
edgecolor='none',
legend=False
)
# Create legend
patches = [mpatches.Patch(color=method_colors[m], label=m) for m in categories
if m in result_gdf['dominant_method'].unique()]
ax1.legend(handles=patches, loc='upper right', title='Dominant Approach')
# Add summary statistics
counts = result_gdf['dominant_method'].value_counts()
total = len(result_gdf)
stats_text = "Cell Distribution:\n" + "\n".join(
[f" {m}: {c:,} ({100*c/total:.1f}%)" for m, c in counts.items()]
)
ax1.text(0.02, 0.02, stats_text, transform=ax1.transAxes,
fontsize=9, verticalalignment='bottom',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
ax1.set_title(f'Dominant Precipitation Approach - {aep_name}', fontsize=14, fontweight='bold')
ax1.set_xlabel('X Coordinate')
ax1.set_ylabel('Y Coordinate')
ax1.set_aspect('equal')
fig1.tight_layout()
fig1.savefig(output_folder / f'{aep_safe}_01_dominant_approach.png', dpi=150, bbox_inches='tight')
plt.show()
print(f" [OK] Saved: {aep_safe}_01_dominant_approach.png")
# ===== FIGURE 2: Maximum WSE Map =====
fig2, ax2 = plt.subplots(figsize=(12, 10))
result_gdf.plot(
column='max_wse',
ax=ax2,
cmap='viridis',
legend=True,
legend_kwds={'label': 'Max WSE (ft)', 'shrink': 0.8},
edgecolor='none'
)
ax2.set_title(f'Maximum WSE (from Dominant Approach) - {aep_name}', fontsize=14, fontweight='bold')
ax2.set_xlabel('X Coordinate')
ax2.set_ylabel('Y Coordinate')
ax2.set_aspect('equal')
# Add statistics
wse_stats = f"WSE Range: {result_gdf['max_wse'].min():.2f} - {result_gdf['max_wse'].max():.2f} ft\n"
wse_stats += f"Mean: {result_gdf['max_wse'].mean():.2f} ft"
ax2.text(0.02, 0.98, wse_stats, transform=ax2.transAxes,
fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
fig2.tight_layout()
fig2.savefig(output_folder / f'{aep_safe}_02_max_wse.png', dpi=150, bbox_inches='tight')
plt.show()
print(f" [OK] Saved: {aep_safe}_02_max_wse.png")
# ===== FIGURE 3: WSE Difference Map (Gridded - Uniform) =====
fig3, ax3 = plt.subplots(figsize=(12, 10))
# Calculate difference if we have both columns
gridded_col = [c for c in result_gdf.columns if 'Gridded' in c and '_wse' in c]
uniform_col = [c for c in result_gdf.columns if 'Uniform' in c and '_wse' in c]
if gridded_col and uniform_col:
result_gdf['wse_diff'] = result_gdf[gridded_col[0]] - result_gdf[uniform_col[0]]
plot_col = 'wse_diff'
cbar_label = 'WSE Difference (ft)\nPositive = Gridded Higher'
title_suffix = 'Gridded - Uniform'
# Use diverging colormap centered at 0
vmax = max(abs(result_gdf['wse_diff'].max()), abs(result_gdf['wse_diff'].min()))
vmin = -vmax
result_gdf.plot(
column=plot_col,
ax=ax3,
cmap='RdBu_r', # Red = positive, Blue = negative
legend=True,
legend_kwds={'label': cbar_label, 'shrink': 0.8},
edgecolor='none',
vmin=vmin,
vmax=vmax
)
else:
# Fallback to delta_wse if specific columns not found
plot_col = 'delta_wse'
result_gdf.plot(
column=plot_col,
ax=ax3,
cmap='YlOrRd',
legend=True,
legend_kwds={'label': 'WSE Difference (ft)', 'shrink': 0.8},
edgecolor='none'
)
title_suffix = 'Sensitivity'
ax3.set_title(f'WSE Difference ({title_suffix}) - {aep_name}', fontsize=14, fontweight='bold')
ax3.set_xlabel('X Coordinate')
ax3.set_ylabel('Y Coordinate')
ax3.set_aspect('equal')
# Add statistics
if 'wse_diff' in result_gdf.columns:
diff_stats = f"Difference Range: {result_gdf['wse_diff'].min():.3f} to {result_gdf['wse_diff'].max():.3f} ft\n"
diff_stats += f"Mean: {result_gdf['wse_diff'].mean():.3f} ft\n"
diff_stats += f"Cells where Gridded > Uniform: {(result_gdf['wse_diff'] > 0.01).sum():,}\n"
diff_stats += f"Cells where Uniform > Gridded: {(result_gdf['wse_diff'] < -0.01).sum():,}"
else:
diff_stats = f"Delta Range: {result_gdf['delta_wse'].min():.3f} - {result_gdf['delta_wse'].max():.3f} ft\n"
diff_stats += f"Mean: {result_gdf['delta_wse'].mean():.3f} ft"
ax3.text(0.02, 0.98, diff_stats, transform=ax3.transAxes,
fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
fig3.tight_layout()
fig3.savefig(output_folder / f'{aep_safe}_03_wse_difference.png', dpi=150, bbox_inches='tight')
plt.show()
print(f" [OK] Saved: {aep_safe}_03_wse_difference.png")
return output_folder
print("[OK] Spatial analysis helper functions defined")
# =============================================================================
# 7.1 WSE Difference Map: Gridded vs Uniform Precipitation
# =============================================================================
print("="*80)
print("WSE DIFFERENCE MAP: GRIDDED vs UNIFORM PRECIPITATION")
print("="*80)
if not EXECUTE_PLAN:
print("\n[--] EXECUTE_PLAN is False - skipping WSE comparison")
print("\nThis visualization requires executed HEC-RAS results for both plans.")
VARIANCE_MAP_AVAILABLE = False
elif not COMPARISON_PLAN_CREATED:
print("\n[--] Uniform comparison plan not created - skipping")
print("\nTo enable: ensure storms_suite was generated and re-run from Part 5.")
VARIANCE_MAP_AVAILABLE = False
elif not (PROJECT_AVAILABLE and 'EXECUTION_SUCCESS' in dir() and EXECUTION_SUCCESS):
print("\n[--] No execution results available")
print("\nRun Part 6 (Plan Execution) first.")
VARIANCE_MAP_AVAILABLE = False
else:
print(f"\nComparing WSE: Gridded (Plan {TEMPLATE_PLAN}) vs Uniform (Plan {UNIFORM_PLAN})")
try:
from ras_commander.hdf import HdfResultsMesh, HdfMesh
import pandas as pd
output_folder = project_path / "precip_analysis"
output_folder.mkdir(exist_ok=True)
gridded_hdf = list(project_path.glob(f"*.p{TEMPLATE_PLAN}.hdf"))
uniform_hdf = list(project_path.glob(f"*.p{UNIFORM_PLAN}.hdf"))
if not gridded_hdf or not uniform_hdf:
missing = []
if not gridded_hdf:
missing.append(f"Gridded (Plan {TEMPLATE_PLAN})")
if not uniform_hdf:
missing.append(f"Uniform (Plan {UNIFORM_PLAN})")
print(f"\n[!!] Missing HDF files: {', '.join(missing)}")
VARIANCE_MAP_AVAILABLE = False
else:
print(f" Gridded HDF: {gridded_hdf[0].name}")
print(f" Uniform HDF: {uniform_hdf[0].name}")
print(" Extracting max WSE from gridded plan...")
gridded_ws = HdfResultsMesh.get_mesh_max_ws(gridded_hdf[0])
print(" Extracting max WSE from uniform plan...")
uniform_ws = HdfResultsMesh.get_mesh_max_ws(uniform_hdf[0])
if gridded_ws is None or uniform_ws is None or len(gridded_ws) == 0 or len(uniform_ws) == 0:
print("\n[!!] Could not extract WSE from one or both plans")
VARIANCE_MAP_AVAILABLE = False
else:
wse_col = [c for c in gridded_ws.columns if 'water_surface' in c.lower()][0]
print(f" [OK] Gridded: {len(gridded_ws):,} cells, Uniform: {len(uniform_ws):,} cells")
# Get cell polygons for map
cell_polygons = None
has_polygons = False
try:
cell_polygons = HdfMesh.get_mesh_cell_polygons(gridded_hdf[0])
if cell_polygons is not None and len(cell_polygons) > 0:
has_polygons = True
except Exception:
pass
# Merge both WSE datasets
gridded_ws['cell_id'] = gridded_ws['cell_id'].astype(int)
uniform_ws['cell_id'] = uniform_ws['cell_id'].astype(int)
merged = gridded_ws[['mesh_name', 'cell_id', wse_col]].rename(
columns={wse_col: 'wse_gridded'}
).merge(
uniform_ws[['mesh_name', 'cell_id', wse_col]].rename(
columns={wse_col: 'wse_uniform'}
),
on=['mesh_name', 'cell_id'],
how='inner'
)
merged['wse_diff'] = merged['wse_gridded'] - merged['wse_uniform']
diff_min = merged['wse_diff'].min()
diff_max = merged['wse_diff'].max()
diff_mean = merged['wse_diff'].mean()
diff_std = merged['wse_diff'].std()
print(f"\n WSE Difference (Gridded - Uniform):")
print(f" Min: {diff_min:+.3f} ft")
print(f" Max: {diff_max:+.3f} ft")
print(f" Mean: {diff_mean:+.3f} ft")
print(f" Std: {diff_std:.3f} ft")
print(f" Cells where gridded higher: {(merged['wse_diff'] > 0.01).sum():,}")
print(f" Cells where uniform higher: {(merged['wse_diff'] < -0.01).sum():,}")
# Join with polygons
if has_polygons:
cell_polygons = cell_polygons.copy()
cell_polygons['cell_id'] = cell_polygons['cell_id'].astype(int)
plot_gdf = cell_polygons.merge(merged, on=['mesh_name', 'cell_id'], how='left')
else:
plot_gdf = gridded_ws[['mesh_name', 'cell_id', 'geometry']].merge(
merged, on=['mesh_name', 'cell_id'], how='left'
)
# Create WSE difference map
fig, ax = plt.subplots(figsize=(12, 10))
vmax = max(abs(diff_min), abs(diff_max))
if vmax < 0.01:
vmax = 0.01
if has_polygons:
plot_gdf.plot(
column='wse_diff',
ax=ax,
cmap='RdBu_r',
legend=True,
legend_kwds={
'label': 'WSE Difference (ft): Gridded \u2212 Uniform',
'shrink': 0.8
},
edgecolor='none',
vmin=-vmax,
vmax=vmax
)
else:
scatter = ax.scatter(
plot_gdf.geometry.x, plot_gdf.geometry.y,
c=plot_gdf['wse_diff'],
cmap='RdBu_r', s=2, alpha=0.8,
vmin=-vmax, vmax=vmax
)
cbar = plt.colorbar(scatter, ax=ax, shrink=0.8)
cbar.set_label('WSE Difference (ft): Gridded \u2212 Uniform', fontsize=10)
if mesh_areas_gdf is not None:
mesh_areas_gdf.boundary.plot(ax=ax, color='black', linewidth=1.5)
# Statistics annotation
stats_text = (
f"Difference: {diff_min:+.3f} to {diff_max:+.3f} ft\n"
f"Mean: {diff_mean:+.3f} ft | Std: {diff_std:.3f} ft\n"
f"Gridded higher: {(merged['wse_diff'] > 0.01).sum():,} cells\n"
f"Uniform higher: {(merged['wse_diff'] < -0.01).sum():,} cells"
)
ax.text(0.02, 0.98, stats_text, transform=ax.transAxes,
fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.85))
ax.set_title(
f'WSE Difference: Gridded \u2212 Uniform Precipitation\n'
f'{DEMO_AEP_NAME} {STORM_DURATION_HOURS}-hr Storm \u2014 {PROJECT_NAME}',
fontsize=13, fontweight='bold'
)
ax.set_xlabel('X Coordinate (ft)', fontsize=10)
ax.set_ylabel('Y Coordinate (ft)', fontsize=10)
ax.set_aspect('equal')
plt.tight_layout()
fig_path = output_folder / f"wse_diff_gridded_vs_uniform_{DEMO_AEP_NAME}_{STORM_DURATION_HOURS}hr.png"
plt.savefig(fig_path, dpi=150, bbox_inches='tight',
facecolor='white', edgecolor='none')
print(f"\n [OK] Figure saved: {fig_path.name}")
plt.show()
VARIANCE_MAP_AVAILABLE = True
except Exception as e:
print(f"\n[!!] Error creating WSE difference map: {e}")
import traceback
traceback.print_exc()
VARIANCE_MAP_AVAILABLE = False
7.2 Maximum Water Surface Elevation Map¶
This map visualizes the maximum water surface elevation (WSE) for each mesh cell from the executed HEC-RAS plan. This visualization:
- Shows flood extent - Which cells were inundated during the storm
- Shows flood depth variability - Distribution of WSE across the model domain
- Enables method comparison - When multiple plans are run with different temporal distributions
Engineering Value: - Identify areas of highest flood risk - Verify model behavior is reasonable - Compare results across different precipitation methods
# =============================================================================
# 7.2 Maximum WSE Map
# =============================================================================
print("="*80)
print("MAXIMUM WATER SURFACE ELEVATION MAP")
print("="*80)
# This visualization requires executed plan results
if not EXECUTE_PLAN:
print("\n[--] EXECUTE_PLAN is False")
print("\nThis visualization requires executed HEC-RAS results.")
print("To enable:")
print(" 1. Set EXECUTE_PLAN = True in the configuration cell")
print(" 2. Re-run the notebook")
METHOD_MAP_AVAILABLE = False
elif not (PROJECT_AVAILABLE and 'EXECUTION_SUCCESS' in dir() and EXECUTION_SUCCESS):
print("\n[--] No execution results available")
print("\nRun Part 6 (Plan Execution) first to generate results.")
METHOD_MAP_AVAILABLE = False
else:
print("\nCreating maximum WSE visualization...")
try:
from ras_commander.hdf import HdfResultsMesh, HdfMesh
output_folder = project_path / "precip_analysis"
output_folder.mkdir(exist_ok=True)
print(f" Output folder: {output_folder}")
plan_hdfs = list(project_path.glob(f"*.p{TEMPLATE_PLAN}.hdf"))
if not plan_hdfs:
print(f"\n[!!] Plan HDF not found for Plan {TEMPLATE_PLAN}")
METHOD_MAP_AVAILABLE = False
else:
plan_hdf = plan_hdfs[0]
print(f" Plan HDF: {plan_hdf.name}")
# Identify boundary condition for this plan
plan_row = ras.plan_df[ras.plan_df['plan_number'] == TEMPLATE_PLAN]
plan_title = plan_row['Plan Title'].values[0] if len(plan_row) > 0 else f"Plan {TEMPLATE_PLAN}"
bc_label = "Gridded Precipitation"
print(" Extracting maximum WSE from 2D mesh results...")
max_ws_gdf = HdfResultsMesh.get_mesh_max_ws(plan_hdf)
if max_ws_gdf is None or len(max_ws_gdf) == 0:
print("\n[!!] No 2D mesh WSE data found in results")
METHOD_MAP_AVAILABLE = False
else:
print(f" [OK] Extracted {len(max_ws_gdf):,} mesh cells")
# Get mesh cell polygons
print(" Extracting mesh cell polygons...")
cell_polygons = None
has_polygons = False
try:
cell_polygons = HdfMesh.get_mesh_cell_polygons(plan_hdf)
if cell_polygons is not None and len(cell_polygons) > 0:
has_polygons = True
print(f" [OK] Extracted {len(cell_polygons):,} polygons from plan HDF")
except Exception as e:
print(f" [--] Plan HDF polygon extraction failed: {e}")
if not has_polygons and geom_hdf is not None:
try:
cell_polygons = HdfMesh.get_mesh_cell_polygons(geom_hdf)
if cell_polygons is not None and len(cell_polygons) > 0:
has_polygons = True
print(f" [OK] Extracted {len(cell_polygons):,} polygons from geometry HDF")
except Exception as e:
print(f" [--] Geometry HDF polygon extraction failed: {e}")
if not has_polygons:
print(" [!!] No polygons available - using scatter plot fallback")
# Determine WSE column name
wse_col = None
for col in ['maximum_water_surface', 'max_ws', 'Maximum Water Surface']:
if col in max_ws_gdf.columns:
wse_col = col
break
if wse_col is None:
print(f"\n[!!] Cannot find WSE column in results")
print(f" Available columns: {list(max_ws_gdf.columns)}")
METHOD_MAP_AVAILABLE = False
else:
wse_values = max_ws_gdf[wse_col].dropna()
wse_min = wse_values.min()
wse_max = wse_values.max()
wse_mean = wse_values.mean()
wse_std = wse_values.std()
# 2-panel figure: Map + Summary Stats (no histogram)
fig, axes = plt.subplots(1, 2, figsize=(15, 7),
gridspec_kw={'width_ratios': [3, 1]})
# ===== Panel 1: Maximum WSE Map =====
ax1 = axes[0]
if has_polygons and cell_polygons is not None:
cell_polygons = cell_polygons.copy()
max_ws_gdf_copy = max_ws_gdf.copy()
cell_polygons['cell_id'] = cell_polygons['cell_id'].astype(int)
max_ws_gdf_copy['cell_id'] = max_ws_gdf_copy['cell_id'].astype(int)
merged = cell_polygons.merge(
max_ws_gdf_copy[['mesh_name', 'cell_id', wse_col]],
on=['mesh_name', 'cell_id'],
how='left'
)
valid_count = merged[wse_col].notna().sum()
if valid_count == 0:
print(f" [!!] Merge failed - no matching cells")
has_polygons = False
else:
print(f" [OK] Merged {valid_count:,} of {len(merged):,} polygons with WSE data")
merged.plot(
column=wse_col,
ax=ax1,
legend=True,
legend_kwds={'label': 'Max WSE (ft)', 'shrink': 0.7},
cmap='viridis',
edgecolor='none',
alpha=0.9,
vmin=wse_min,
vmax=wse_max
)
if not has_polygons:
scatter = ax1.scatter(
max_ws_gdf.geometry.x,
max_ws_gdf.geometry.y,
c=max_ws_gdf[wse_col],
cmap='viridis',
s=2,
alpha=0.8,
vmin=wse_min,
vmax=wse_max
)
cbar = plt.colorbar(scatter, ax=ax1, shrink=0.7)
cbar.set_label('Max WSE (ft)', fontsize=10)
if mesh_areas_gdf is not None:
mesh_areas_gdf.boundary.plot(ax=ax1, color='red', linewidth=1.5)
ax1.set_xlabel('X Coordinate (ft)', fontsize=10)
ax1.set_ylabel('Y Coordinate (ft)', fontsize=10)
ax1.set_title(
f'Maximum WSE \u2014 {bc_label}\n{DEMO_AEP_NAME} {STORM_DURATION_HOURS}-hr Storm (Plan {TEMPLATE_PLAN})',
fontsize=11, fontweight='bold'
)
ax1.set_aspect('equal')
ax1.tick_params(axis='both', labelsize=8)
# ===== Panel 2: Summary Statistics =====
ax2 = axes[1]
ax2.axis('off')
inundated_count = (wse_values > 0).sum()
summary_lines = [
"MAXIMUM WSE ANALYSIS",
"=" * 35,
"",
"Boundary Condition:",
f" Type: {bc_label}",
f" Plan: {TEMPLATE_PLAN} ({plan_title})",
"",
"Storm Configuration:",
f" Event: {DEMO_AEP_NAME} ({DEMO_AEP_PERCENT}% AEP)",
f" Duration: {STORM_DURATION_HOURS} hours",
f" Total Depth: {TOTAL_DEPTH_INCHES:.2f} inches",
"",
"WSE Statistics:",
f" Minimum: {wse_min:.2f} ft",
f" Maximum: {wse_max:.2f} ft",
f" Mean: {wse_mean:.2f} ft",
f" Std Dev: {wse_std:.2f} ft",
"",
"Mesh Information:",
f" Total Cells: {len(max_ws_gdf):,}",
f" Inundated: {inundated_count:,}",
"",
"Data Source:",
f" HDF: {plan_hdf.name}",
]
summary_text = "\n".join(summary_lines)
ax2.text(0.05, 0.95, summary_text, transform=ax2.transAxes,
fontsize=10, verticalalignment='top',
fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='lightyellow',
alpha=0.8, edgecolor='gray'))
fig.suptitle(
f'HEC-RAS 2D Results \u2014 {PROJECT_NAME}',
fontsize=13, fontweight='bold', y=0.98
)
plt.tight_layout()
fig_path = output_folder / f"wse_map_{DEMO_AEP_NAME}_{STORM_DURATION_HOURS}hr.png"
plt.savefig(fig_path, dpi=150, bbox_inches='tight',
facecolor='white', edgecolor='none')
print(f"\n [OK] Figure saved: {fig_path.name}")
plt.show()
print(f"\n [OK] Maximum WSE map created successfully")
print(f" Boundary Condition: {bc_label} (Plan {TEMPLATE_PLAN})")
print(f"\n WSE Summary:")
print(f" Range: {wse_min:.2f} - {wse_max:.2f} ft")
print(f" Mean: {wse_mean:.2f} ft")
METHOD_MAP_AVAILABLE = True
except Exception as e:
print(f"\n[!!] Error creating WSE map: {e}")
import traceback
traceback.print_exc()
METHOD_MAP_AVAILABLE = False
7.3 Gridded vs Uniform Precipitation Comparison¶
This section compares Plan 06 (gridded precipitation) with Plan 07 (uniform precipitation reduced by 10%) to visualize where spatial variability in rainfall affects flooding predictions.
Why the 10% reduction? Over a small model domain with low spatial variance (<10%), gridded vs. uniform precipitation alone produces negligible WSE differences. The 10% reduction guarantees a measurable signal and proves that Plan 07 is actually using the uniform boundary-level hyetograph (if it were ignored, WSE would match Plan 06 exactly).
The comparison generates:
- Dominant Approach Map - Shows which approach produces higher max WSE at each mesh cell
- Maximum WSE Map - Displays the highest WSE from either approach
- WSE Difference Map - Shows
gridded_wse - uniform_wse(positive = gridded higher, negative = uniform higher)
# =============================================================================
# 7.3 Gridded vs Uniform Precipitation Comparison
# =============================================================================
print("="*80)
print("GRIDDED VS UNIFORM PRECIPITATION COMPARISON")
print("="*80)
if not EXECUTE_PLAN:
print("\n[--] EXECUTE_PLAN is False - skipping comparison")
print("\nThis visualization requires executed HEC-RAS results.")
print("Set EXECUTE_PLAN = True and re-run the notebook.")
elif not COMPARISON_PLAN_CREATED:
print("\n[--] Comparison plan not created - skipping")
print("\nTo enable comparison:")
print(" 1. Ensure storms_suite was generated in Part 3")
print(" 2. Re-run the notebook from Part 5")
elif not (PROJECT_AVAILABLE and 'EXECUTION_SUCCESS' in dir() and EXECUTION_SUCCESS):
print("\n[--] No execution results available")
print("\nRun Part 6 (Plan Execution) first to generate results.")
else:
print("\nComparing gridded vs uniform precipitation approaches...")
print("Generating 3 figures:\n")
print(" 1. Dominant Approach Map (gridded vs uniform)")
print(" 2. Maximum WSE Map (from dominant approach)")
print(" 3. WSE Difference Map (gridded - uniform)")
try:
# Ensure geopandas is available, handle missing import error gracefully:
try:
import geopandas as gpd
except ImportError:
print("\n[!!] geopandas (gpd) is required for this analysis but is not installed or not imported.")
print(" Install geopandas with:\n pip install geopandas")
raise
# Results are consolidated to original folder (v0.88.1+)
analysis_path = project_path
# Build plan_methods dict
plan_methods = {
'Gridded Precipitation': TEMPLATE_PLAN,
'Uniform Precipitation': UNIFORM_PLAN
}
print(f"\n Comparing {len(plan_methods)} approaches:")
for method, plan_num in plan_methods.items():
print(f" - {method} (Plan {plan_num})")
# Initialize a local RasPrj so plan_df reflects analysis_path
ras_local = init_ras_project(
analysis_path,
RAS_VERSION,
ras_object=RasPrj()
)
# Check for presence of both HDF files before analyzing
hdf_missing = []
for method, plan_num in plan_methods.items():
try:
hdf_path = ras_local.get_hdf_paths(plan_num)['results']
except ValueError:
hdf_missing.append((plan_num, method, None, "Plan not found"))
continue
if hdf_path is None:
hdf_missing.append(
(plan_num, method, None,
"HDF path not set (plan not computed?)")
)
continue
if not hdf_path.exists():
hdf_missing.append((plan_num, method, hdf_path, "HDF not found"))
continue
if hdf_missing:
for plan_num, method, hdf_path, reason in hdf_missing:
print(f"\n [!!] {reason} for {method} (Plan {plan_num})")
if hdf_path is not None:
print(f" Expected file: {hdf_path}")
print("\n [!!] Comparison aborted due to missing HDF result files.\n")
else:
# Analyze dominant method
print("\n Analyzing which approach produces higher WSE at each cell...")
result_gdf = analyze_dominant_method(
analysis_path,
plan_methods,
ras_object=ras_local
)
if result_gdf is not None and len(result_gdf) > 0:
print(f" [OK] Analyzed {len(result_gdf):,} mesh cells")
# Generate figures
output_folder = project_path / "precip_analysis"
generate_dominant_method_figures(
result_gdf,
f"{DEMO_AEP_NAME} {STORM_DURATION_HOURS}hr",
output_folder,
list(plan_methods.keys())
)
print(f"\n [OK] Comparison complete - figures saved to: {output_folder}")
# Summary statistics
counts = result_gdf['dominant_method'].value_counts()
print(f"\n Cell Distribution by Dominant Approach:")
for method, count in counts.items():
pct = 100 * count / len(result_gdf)
print(f" {method}: {count:,} cells ({pct:.1f}%)")
# If we have WSE difference, show summary
if 'wse_diff' in result_gdf.columns:
print(f"\n WSE Difference Summary (Gridded - Uniform):")
print(f" Min: {result_gdf['wse_diff'].min():.3f} ft")
print(f" Max: {result_gdf['wse_diff'].max():.3f} ft")
print(f" Mean: {result_gdf['wse_diff'].mean():.3f} ft")
print(f" Std Dev: {result_gdf['wse_diff'].std():.3f} ft")
else:
print("\n [!!] No mesh cell data available for comparison")
except Exception as e:
print(f"\n[!!] Error during comparison: {e}")
import traceback
traceback.print_exc()
print("\n" + "="*80)
Future Enhancement: Multi-Method Comparison Map¶
For comparing multiple precipitation methods (Atlas14Storm, FrequencyStorm, ScsTypeStorm, StormGenerator), you would:
- Run multiple plans - one for each temporal distribution method
- Extract max WSE from each plan's HDF file
- Compare cell-by-cell to identify which method produces maximum WSE
- Color code by "winning" method for visualization
Example workflow (conceptual):
# Run all methods with same depth
methods = ['Atlas14Storm', 'FrequencyStorm', 'ScsTypeStorm', 'StormGenerator']
method_plans = {'Atlas14Storm': '01', 'FrequencyStorm': '02', ...}
# Extract max WSE from each
all_wse = {}
for method, plan in method_plans.items():
hdf = project_path / f'Project.p{plan}.hdf'
all_wse[method] = HdfResultsMesh.get_mesh_max_ws(hdf)
# For each cell, find which method had highest WSE
winning_method = pd.DataFrame()
for cell_id in all_cells:
max_wse = -999
winner = None
for method in methods:
wse = all_wse[method].loc[cell_id, 'maximum_water_surface']
if wse > max_wse:
max_wse = wse
winner = method
winning_method.loc[cell_id, 'method'] = winner
# Color code by winning method
method_colors = {'Atlas14Storm': 'blue', 'FrequencyStorm': 'orange',
'ScsTypeStorm': 'green', 'StormGenerator': 'red'}
This analysis helps determine which temporal distribution is most conservative for each part of the model domain.
Part 8: Conclusion and Roadmap¶
# =============================================================================
# 8.1 Summary Statistics
# =============================================================================
print("="*80)
print("NOTEBOOK SUMMARY")
print("="*80)
print(f"\nProject: {PROJECT_NAME}")
print(f"Plan: {TEMPLATE_PLAN}")
if mesh_cells_gdf is not None:
print(f"\nMesh Analysis:")
print(f" Total cells: {len(mesh_cells_gdf):,}")
if storms_suite:
print(f"\nStorms Generated: {len(storms_suite)}")
for name in storms_suite.keys():
print(f" - {name}")
if VARIANCE_AVAILABLE:
print(f"\nSpatial Variance Assessment:")
max_var = variance_results['range_pct'].max()
print(f" Maximum variance: {max_var:.1f}%")
print(f" Threshold: {VARIANCE_THRESHOLD_PCT}%")
print(f" Uniform appropriate: {'Yes' if UNIFORM_APPROPRIATE else 'No'}")
print(f"\nExecution:")
print(f" Enabled: {EXECUTE_PLAN}")
if EXECUTE_PLAN:
print(f" Success: {'Yes' if EXECUTION_SUCCESS else 'No'}")
Conclusion¶
This notebook demonstrated gridded precipitation workflows for rain-on-grid 2D HEC-RAS modeling:
Current Capabilities¶
| Capability | Status | Method |
|---|---|---|
| Generate HMS-equivalent hyetographs | Available | Atlas14Storm, FrequencyStorm, ScsTypeStorm |
| Extract mesh cell locations | Available | HdfMesh.get_mesh_cell_points() |
| Assess spatial variance | Available | Atlas14Variance.analyze() |
| Determine uniformity | Available | Atlas14Variance.is_uniform_rainfall_appropriate() |
| Execute plan | Available | RasCmdr.compute_plan() |
| Extract 2D results | Available | HdfResultsMesh.get_mesh_max_ws() |
Roadmap: Atlas14 Gridded Integration (Direct)¶
| Capability | Status | Notes |
|---|---|---|
| Atlas14 PFE grid -> SHG NetCDF | In progress | Reproject to EPSG:5070, preserve native grid resolution (nearest) |
| HMS-equivalent temporal distribution on grid | In progress | Atlas14Storm or FrequencyStorm applied to PFE depth grid |
| Align NetCDF time axis to plan window | In progress | Plan Simulation Date must match NetCDF time range |
| Configure unsteady file for GDAL precip | In progress | RasUnsteady.set_gridded_precipitation |
| Compute and validate in HEC-RAS | Pending | Review compute messages, confirm precip import success |
Key Findings¶
- Spatial variance should be assessed before applying uniform rainfall
- If variance > 10%, consider spatially variable rainfall
- For BaldEagleCrkMulti2D, variance is typically < 10%, so uniform rainfall is appropriate
- HMS-equivalent methods conserve depth at 10^-6 precision
Related Notebooks¶
720_precipitation_methods_comprehensive.ipynb- Complete method comparison721_atlas14_comprehensive_workflow.ipynb- Hydrograph BC workflow (bulk execution)725_atlas14_spatial_variance.ipynb- Detailed spatial variance analysis