{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# Open-Meteo Historical Climate for Master Data Centers\n", "\n", "This notebook builds or refreshes `public.data_center_open_meteo_historical_climate`, a one-row-per-data-center climate summary table keyed by `master_id`.\n", "\n", "Design goals:\n", "- Joinable to `public.master_data_centers` through `master_id`.\n", "- Rerunnable as new data centers are added to the master table.\n", "- Uses the same `.env` and `PGWEB_*` connection pattern as `spatial_clustering_master_data_centers.ipynb`.\n", "- Fetches metrics available from the Open-Meteo Historical Weather API now, and leaves explicit placeholders for NOAA/FEMA/smoke additions later.\n", "\n", "Open-Meteo coverage in this notebook:\n", "\n", "| Requested variable | Notebook field(s) | Source status |\n", "| --- | --- | --- |\n", "| Mean annual temperature | `mean_annual_temperature_c` | Open-Meteo |\n", "| Mean summer temperature | `mean_summer_temperature_c` | Open-Meteo |\n", "| Wet bulb temperature | `mean_wet_bulb_temperature_c`, `max_wet_bulb_temperature_c`, `extreme_wet_bulb_days` | Open-Meteo |\n", "| Relative humidity | `mean_relative_humidity_pct` | Open-Meteo |\n", "| Cooling degree days | `cooling_degree_days_c`, `annual_cooling_degree_days_c_mean` | Derived from Open-Meteo |\n", "| Extreme heat days | `extreme_heat_days`, `annual_extreme_heat_days_mean` | Derived from Open-Meteo |\n", "| Diurnal temperature range | `mean_diurnal_temperature_range_c` | Derived from Open-Meteo |\n", "| Precipitation variability | `precipitation_total_mm`, `annual_precipitation_cv`, `wet_day_precipitation_p95_mm` | Derived from Open-Meteo |\n", "| Drought severity index | `drought_severity_index_source_status` | External source needed |\n", "| Windstorm frequency | `windstorm_days`, `annual_windstorm_days_mean` | Open-Meteo proxy using wind gust threshold |\n", "| Wildfire smoke exposure | `wildfire_smoke_exposure_source_status` | External source needed |\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "import json\n", "import os\n", "import time\n", "import urllib.error\n", "import urllib.parse\n", "import urllib.request\n", "from pathlib import Path\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import psycopg2\n", "from psycopg2 import sql\n", "from psycopg2.extras import execute_values\n", "\n", "pd.set_option('display.max_columns', 100)\n", "pd.set_option('display.max_rows', 120)\n", "\n", "print('pandas:', pd.__version__)\n", "print('numpy:', np.__version__)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2", "metadata": {}, "outputs": [], "source": [ "def load_env_file(env_path: str = '.env') -> None:\n", " p = Path(env_path)\n", " if not p.exists():\n", " print(f'No {env_path} file found in {Path.cwd()}')\n", " return\n", "\n", " loaded = 0\n", " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith('#') or '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " key = key.strip()\n", " value = value.strip().strip('\"').strip(\"'\")\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", " loaded += 1\n", " print(f'Loaded {loaded} env var(s) from {env_path}')\n", "\n", "\n", "def require_env(keys):\n", " missing = [k for k in keys if not os.getenv(k)]\n", " if missing:\n", " raise EnvironmentError(\n", " 'Missing required env vars in notebook kernel: ' + ', '.join(missing) +\n", " '.\\nSet them in this notebook, or add them to a .env file in this folder.'\n", " )\n", "\n", "\n", "load_env_file('.env')\n", "required_keys = ['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD']\n", "require_env(required_keys)\n", "\n", "DB_NAME = os.getenv('PGDATABASE', 'data_centers')\n", "MASTER_TABLE = 'public.master_data_centers'\n", "CLIMATE_TABLE = 'public.data_center_open_meteo_historical_climate'\n", "\n", "\n", "def get_conn():\n", " return psycopg2.connect(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " password=os.environ['PGWEB_PASSWORD'],\n", " dbname=\"data_centers\",\n", " )\n", "\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select current_database(), current_user')\n", " db, usr = cur.fetchone()\n", " print('Connected to DB:', db)\n", " print('As user:', usr)\n", " cur.execute('create extension if not exists postgis')\n", " cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n", " if cur.fetchone()[0] is None:\n", " raise RuntimeError(f'{MASTER_TABLE} does not exist. Run build_master_data_centers.py first.')\n", " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n", " print(f'{MASTER_TABLE} rows:', f'{cur.fetchone()[0]:,}')\n" ] }, { "cell_type": "markdown", "id": "3", "metadata": {}, "source": [ "## Parameters\n", "\n", "The default climate normal period is 1991-2020. Open-Meteo can go farther back, but this period is a standard baseline and keeps each API response moderate.\n", "\n", "Set `REFRESH_EXISTING = True` when you want to recompute existing rows after changing thresholds, variables, dates, or source model. Leave it `False` to only pick up new `master_id` values added since the last run.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4", "metadata": {}, "outputs": [], "source": [ "OPEN_METEO_ARCHIVE_URL = 'https://archive-api.open-meteo.com/v1/archive'\n", "OPEN_METEO_MODEL = 'era5'\n", "\n", "START_DATE = '1991-01-01'\n", "END_DATE = '2020-12-31'\n", "TIMEZONE = 'UTC'\n", "\n", "REFRESH_EXISTING = False\n", "DRY_RUN = False\n", "MAX_POINTS = None # use a small integer for testing, or None for all eligible data centers\n", "BATCH_SIZE = 1 # Public Open-Meteo archive limits are tight for 30-year daily histories\n", "REQUEST_SLEEP_SECONDS = 70.0\n", "REQUEST_TIMEOUT_SECONDS = 90\n", "MAX_RETRIES = 8\n", "RATE_LIMIT_SLEEP_SECONDS = 120\n", "\n", "CDD_BASE_C = 18.3\n", "EXTREME_HEAT_THRESHOLD_C = 35.0\n", "EXTREME_WET_BULB_THRESHOLD_C = 28.0\n", "WINDSTORM_GUST_THRESHOLD_MS = 17.2 # roughly 38.5 mph; adjust for your risk definition\n", "WET_DAY_THRESHOLD_MM = 1.0\n", "\n", "DAILY_VARIABLES = [\n", " 'temperature_2m_mean',\n", " 'temperature_2m_max',\n", " 'temperature_2m_min',\n", " 'relative_humidity_2m_mean',\n", " 'wet_bulb_temperature_2m_mean',\n", " 'wet_bulb_temperature_2m_max',\n", " 'precipitation_sum',\n", " 'wind_gusts_10m_max',\n", " 'soil_moisture_0_to_100cm_mean',\n", "]\n", "\n", "print(f'Period: {START_DATE} to {END_DATE}')\n", "print(f'Target table: {CLIMATE_TABLE}')\n", "print('Refresh existing rows:', REFRESH_EXISTING)\n" ] }, { "cell_type": "markdown", "id": "5", "metadata": {}, "source": [ "## Create Target Table" ] }, { "cell_type": "code", "execution_count": null, "id": "6", "metadata": {}, "outputs": [], "source": [ "CREATE_CLIMATE_TABLE_SQL = f'''\n", "create table if not exists {CLIMATE_TABLE} (\n", " master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n", " source text,\n", " name text,\n", " operator text,\n", " city text,\n", " state text,\n", " country text,\n", " longitude double precision not null,\n", " latitude double precision not null,\n", " geom geometry(Point, 4326),\n", "\n", " open_meteo_model text not null,\n", " climate_period_start date not null,\n", " climate_period_end date not null,\n", " api_timezone text not null,\n", " api_grid_latitude double precision,\n", " api_grid_longitude double precision,\n", " api_elevation_m double precision,\n", " api_utc_offset_seconds integer,\n", " observation_days integer not null,\n", " observation_years integer not null,\n", " api_generationtime_ms double precision,\n", "\n", " mean_annual_temperature_c double precision,\n", " mean_summer_temperature_c double precision,\n", " max_daily_temperature_c double precision,\n", " mean_wet_bulb_temperature_c double precision,\n", " max_wet_bulb_temperature_c double precision,\n", " mean_relative_humidity_pct double precision,\n", " cooling_degree_days_c double precision,\n", " annual_cooling_degree_days_c_mean double precision,\n", " extreme_heat_days integer,\n", " annual_extreme_heat_days_mean double precision,\n", " extreme_wet_bulb_days integer,\n", " mean_diurnal_temperature_range_c double precision,\n", " precipitation_total_mm double precision,\n", " annual_precipitation_mm_mean double precision,\n", " annual_precipitation_cv double precision,\n", " wet_day_precipitation_p95_mm double precision,\n", " windstorm_days integer,\n", " annual_windstorm_days_mean double precision,\n", " max_wind_gust_ms double precision,\n", " mean_soil_moisture_0_to_100cm double precision,\n", "\n", " drought_severity_index_source_status text not null default 'needs_external_source',\n", " wildfire_smoke_exposure_source_status text not null default 'needs_external_source',\n", " open_meteo_daily_variables text[] not null,\n", " open_meteo_url text,\n", " fetched_at timestamptz not null default now(),\n", " updated_at timestamptz not null default now()\n", ");\n", "\n", "create index if not exists data_center_open_meteo_hist_geom_gix\n", " on {CLIMATE_TABLE} using gist (geom);\n", "create index if not exists data_center_open_meteo_hist_state_idx\n", " on {CLIMATE_TABLE} (state);\n", "create index if not exists data_center_open_meteo_hist_period_idx\n", " on {CLIMATE_TABLE} (climate_period_start, climate_period_end);\n", "'''\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(CREATE_CLIMATE_TABLE_SQL)\n", " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(CLIMATE_TABLE)))\n", " existing = cur.fetchone()[0]\n", " print(f'{CLIMATE_TABLE} currently has {existing:,} row(s)')\n" ] }, { "cell_type": "markdown", "id": "7", "metadata": {}, "source": [ "## Load Data Centers Needing Climate Metrics" ] }, { "cell_type": "code", "execution_count": null, "id": "8", "metadata": {}, "outputs": [], "source": [ "def load_target_points(refresh_existing=False, max_points=None):\n", " exists_filter = ''\n", " if not refresh_existing:\n", " exists_filter = f'''\n", " and not exists (\n", " select 1\n", " from {CLIMATE_TABLE} c\n", " where c.master_id = m.master_id\n", " and c.open_meteo_model = %s\n", " and c.climate_period_start = %s::date\n", " and c.climate_period_end = %s::date\n", " )\n", " '''\n", " params = [OPEN_METEO_MODEL, START_DATE, END_DATE]\n", " else:\n", " params = []\n", "\n", " limit_sql = ''\n", " if max_points is not None:\n", " limit_sql = 'limit %s'\n", " params.append(int(max_points))\n", "\n", " query = f'''\n", " select\n", " m.master_id,\n", " m.source,\n", " m.name,\n", " m.operator,\n", " m.city,\n", " m.state,\n", " m.country,\n", " m.longitude,\n", " m.latitude,\n", " ST_AsText(m.geom) as geom_wkt\n", " from {MASTER_TABLE} m\n", " where m.longitude is not null\n", " and m.latitude is not null\n", " and m.geom is not null\n", " {exists_filter}\n", " order by m.master_id\n", " {limit_sql}\n", " '''\n", " with get_conn() as conn:\n", " return pd.read_sql_query(query, conn, params=params)\n", "\n", "\n", "targets = load_target_points(refresh_existing=REFRESH_EXISTING, max_points=MAX_POINTS)\n", "print(f'Target data centers to fetch: {len(targets):,}')\n", "display(targets.head())\n" ] }, { "cell_type": "markdown", "id": "9", "metadata": {}, "source": [ "## Fetch and Summarize Open-Meteo Daily Data" ] }, { "cell_type": "code", "execution_count": null, "id": "10", "metadata": {}, "outputs": [], "source": [ "def format_coordinate_param(values):\n", " if np.isscalar(values):\n", " return f'{float(values):.6f}'\n", " return ','.join(f'{float(value):.6f}' for value in values)\n", "\n", "\n", "def build_open_meteo_url(latitude, longitude):\n", " params = {\n", " 'latitude': format_coordinate_param(latitude),\n", " 'longitude': format_coordinate_param(longitude),\n", " 'start_date': START_DATE,\n", " 'end_date': END_DATE,\n", " 'daily': ','.join(DAILY_VARIABLES),\n", " 'temperature_unit': 'celsius',\n", " 'wind_speed_unit': 'ms',\n", " 'precipitation_unit': 'mm',\n", " 'timezone': TIMEZONE,\n", " 'models': OPEN_METEO_MODEL,\n", " }\n", " return OPEN_METEO_ARCHIVE_URL + '?' + urllib.parse.urlencode(params, safe=',')\n", "\n", "\n", "def retry_after_seconds(exc):\n", " value = exc.headers.get('Retry-After') if getattr(exc, 'headers', None) else None\n", " if value:\n", " try:\n", " return max(1, int(value))\n", " except ValueError:\n", " pass\n", " return None\n", "\n", "\n", "def fetch_json(url):\n", " last_error = None\n", " for attempt in range(1, MAX_RETRIES + 1):\n", " try:\n", " request = urllib.request.Request(url, headers={'User-Agent': 'data-center-climate-notebook/1.0'})\n", " with urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT_SECONDS) as response:\n", " return json.loads(response.read().decode('utf-8'))\n", " except urllib.error.HTTPError as exc:\n", " last_error = exc\n", " try:\n", " error_body = exc.read().decode('utf-8')\n", " except Exception:\n", " error_body = ''\n", " if exc.code == 400:\n", " raise RuntimeError(f'HTTP 400 Bad Request: {error_body or exc.reason}')\n", " if exc.code == 429:\n", " if 'hourly api request limit exceeded' in error_body.lower():\n", " raise RuntimeError(f'HTTP 429 Hourly API limit: {error_body}')\n", " sleep_for = retry_after_seconds(exc) or min(RATE_LIMIT_SLEEP_SECONDS * attempt, 900)\n", " detail = f': {error_body}' if error_body else ''\n", " print(f'Open-Meteo rate limit on attempt {attempt}/{MAX_RETRIES}{detail}; sleeping {sleep_for}s')\n", " else:\n", " sleep_for = min(2 ** attempt, 60)\n", " print(f'HTTP error on attempt {attempt}/{MAX_RETRIES}: {exc}; sleeping {sleep_for}s')\n", " time.sleep(sleep_for)\n", " except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:\n", " last_error = exc\n", " sleep_for = min(2 ** attempt, 60)\n", " print(f'Fetch failed on attempt {attempt}/{MAX_RETRIES}: {exc}; sleeping {sleep_for}s')\n", " time.sleep(sleep_for)\n", " raise RuntimeError(f'Open-Meteo request failed after {MAX_RETRIES} attempt(s): {last_error}')\n", "\n", "\n", "def summarize_daily_weather(payload, dc_row, request_url):\n", " daily = payload.get('daily', {})\n", " if 'time' not in daily:\n", " reason = payload.get('reason') or 'Open-Meteo response did not include daily.time'\n", " raise RuntimeError(reason)\n", "\n", " weather = pd.DataFrame(daily)\n", " weather['time'] = pd.to_datetime(weather['time'])\n", " weather['year'] = weather['time'].dt.year\n", " weather['month'] = weather['time'].dt.month\n", "\n", " numeric_cols = [c for c in weather.columns if c not in {'time', 'year', 'month'}]\n", " for col in numeric_cols:\n", " weather[col] = pd.to_numeric(weather[col], errors='coerce')\n", "\n", " weather['cooling_degree_day_c'] = (weather['temperature_2m_mean'] - CDD_BASE_C).clip(lower=0)\n", " weather['extreme_heat_day'] = weather['temperature_2m_max'] >= EXTREME_HEAT_THRESHOLD_C\n", " weather['extreme_wet_bulb_day'] = weather['wet_bulb_temperature_2m_max'] >= EXTREME_WET_BULB_THRESHOLD_C\n", " weather['diurnal_temperature_range_c'] = weather['temperature_2m_max'] - weather['temperature_2m_min']\n", " weather['windstorm_day'] = weather['wind_gusts_10m_max'] >= WINDSTORM_GUST_THRESHOLD_MS\n", " wet_days = weather.loc[weather['precipitation_sum'] >= WET_DAY_THRESHOLD_MM, 'precipitation_sum']\n", "\n", " annual = (\n", " weather.groupby('year', as_index=False)\n", " .agg(\n", " annual_mean_temperature_c=('temperature_2m_mean', 'mean'),\n", " annual_cooling_degree_days_c=('cooling_degree_day_c', 'sum'),\n", " annual_extreme_heat_days=('extreme_heat_day', 'sum'),\n", " annual_precipitation_mm=('precipitation_sum', 'sum'),\n", " annual_windstorm_days=('windstorm_day', 'sum'),\n", " )\n", " )\n", " summer = weather.loc[weather['month'].isin([6, 7, 8])]\n", "\n", " annual_precip = annual['annual_precipitation_mm'].dropna()\n", " precip_cv = np.nan\n", " if len(annual_precip) > 1 and annual_precip.mean() != 0:\n", " precip_cv = float(annual_precip.std(ddof=1) / annual_precip.mean())\n", "\n", " return {\n", " 'master_id': dc_row.master_id,\n", " 'source': dc_row.source,\n", " 'name': dc_row.name,\n", " 'operator': dc_row.operator,\n", " 'city': dc_row.city,\n", " 'state': dc_row.state,\n", " 'country': dc_row.country,\n", " 'longitude': float(dc_row.longitude),\n", " 'latitude': float(dc_row.latitude),\n", " 'open_meteo_model': OPEN_METEO_MODEL,\n", " 'climate_period_start': START_DATE,\n", " 'climate_period_end': END_DATE,\n", " 'api_timezone': payload.get('timezone', TIMEZONE),\n", " 'api_grid_latitude': payload.get('latitude'),\n", " 'api_grid_longitude': payload.get('longitude'),\n", " 'api_elevation_m': payload.get('elevation'),\n", " 'api_utc_offset_seconds': payload.get('utc_offset_seconds'),\n", " 'observation_days': int(len(weather)),\n", " 'observation_years': int(weather['year'].nunique()),\n", " 'api_generationtime_ms': payload.get('generationtime_ms'),\n", " 'mean_annual_temperature_c': float(annual['annual_mean_temperature_c'].mean()),\n", " 'mean_summer_temperature_c': float(summer['temperature_2m_mean'].mean()) if len(summer) else np.nan,\n", " 'max_daily_temperature_c': float(weather['temperature_2m_max'].max()),\n", " 'mean_wet_bulb_temperature_c': float(weather['wet_bulb_temperature_2m_mean'].mean()),\n", " 'max_wet_bulb_temperature_c': float(weather['wet_bulb_temperature_2m_max'].max()),\n", " 'mean_relative_humidity_pct': float(weather['relative_humidity_2m_mean'].mean()),\n", " 'cooling_degree_days_c': float(weather['cooling_degree_day_c'].sum()),\n", " 'annual_cooling_degree_days_c_mean': float(annual['annual_cooling_degree_days_c'].mean()),\n", " 'extreme_heat_days': int(weather['extreme_heat_day'].sum()),\n", " 'annual_extreme_heat_days_mean': float(annual['annual_extreme_heat_days'].mean()),\n", " 'extreme_wet_bulb_days': int(weather['extreme_wet_bulb_day'].sum()),\n", " 'mean_diurnal_temperature_range_c': float(weather['diurnal_temperature_range_c'].mean()),\n", " 'precipitation_total_mm': float(weather['precipitation_sum'].sum()),\n", " 'annual_precipitation_mm_mean': float(annual['annual_precipitation_mm'].mean()),\n", " 'annual_precipitation_cv': precip_cv,\n", " 'wet_day_precipitation_p95_mm': float(wet_days.quantile(0.95)) if len(wet_days) else 0.0,\n", " 'windstorm_days': int(weather['windstorm_day'].sum()),\n", " 'annual_windstorm_days_mean': float(annual['annual_windstorm_days'].mean()),\n", " 'max_wind_gust_ms': float(weather['wind_gusts_10m_max'].max()),\n", " 'mean_soil_moisture_0_to_100cm': float(weather['soil_moisture_0_to_100cm_mean'].mean()),\n", " 'drought_severity_index_source_status': 'needs_external_source',\n", " 'wildfire_smoke_exposure_source_status': 'needs_external_source',\n", " 'open_meteo_daily_variables': DAILY_VARIABLES,\n", " 'open_meteo_url': request_url,\n", " }\n", "\n", "\n", "def is_request_too_large_error(exc):\n", " text = str(exc).lower()\n", " return 'http 400' in text and 'requests too much data' in text\n", "\n", "\n", "def fetch_batch_summaries(batch):\n", " url = build_open_meteo_url(batch['latitude'].to_list(), batch['longitude'].to_list())\n", " try:\n", " payload = fetch_json(url)\n", " payloads = payload if isinstance(payload, list) else [payload]\n", " if len(payloads) != len(batch):\n", " raise RuntimeError(f'Open-Meteo returned {len(payloads)} payload(s) for {len(batch)} requested location(s)')\n", " rows = [\n", " summarize_daily_weather(location_payload, dc_row, url)\n", " for dc_row, location_payload in zip(batch.itertuples(index=False), payloads)\n", " ]\n", " return rows, []\n", " except Exception as exc:\n", " if is_request_too_large_error(exc) and len(batch) > 1:\n", " midpoint = len(batch) // 2\n", " print(f'Batch of {len(batch)} locations is too large; splitting into {midpoint} + {len(batch) - midpoint}')\n", " left_rows, left_errors = fetch_batch_summaries(batch.iloc[:midpoint].copy())\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", " right_rows, right_errors = fetch_batch_summaries(batch.iloc[midpoint:].copy())\n", " return left_rows + right_rows, left_errors + right_errors\n", " batch_errors = [\n", " {'master_id': dc_row.master_id, 'error': str(exc), 'url': url}\n", " for dc_row in batch.itertuples(index=False)\n", " ]\n", " return [], batch_errors\n", "\n", "\n", "if targets.empty:\n", " climate_rows = []\n", " climate = pd.DataFrame()\n", " print('No new target rows. Set REFRESH_EXISTING=True to recompute existing rows.')\n", "else:\n", " climate_rows = []\n", " errors = []\n", " total = len(targets)\n", " checkpoint_path = Path('output/open_meteo_historical_climate_checkpoint.csv')\n", " checkpoint_path.parent.mkdir(exist_ok=True)\n", "\n", " for batch_start in range(0, total, BATCH_SIZE):\n", " batch = targets.iloc[batch_start:batch_start + BATCH_SIZE].copy()\n", " batch_end = batch_start + len(batch)\n", " batch_rows, batch_errors = fetch_batch_summaries(batch)\n", " climate_rows.extend(batch_rows)\n", " errors.extend(batch_errors)\n", " if batch_errors:\n", " print(f'ERROR batch {batch_start + 1:,}-{batch_end:,}: {batch_errors[0][\"error\"]}')\n", " if '429' in batch_errors[0]['error'] or 'Too Many Requests' in batch_errors[0]['error'] or 'Hourly API limit' in batch_errors[0]['error']:\n", " print('Open-Meteo rate limit reached; stopping this run so successes can be saved before retrying later.')\n", " break\n", "\n", " if climate_rows:\n", " pd.DataFrame(climate_rows).to_csv(checkpoint_path, index=False)\n", " print(f'Checkpointed {len(climate_rows):,} row(s) to {checkpoint_path}')\n", " print(f'Processed {batch_end:,}/{total:,}; successes={len(climate_rows):,}; errors={len(errors):,}')\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", "\n", " climate = pd.DataFrame(climate_rows)\n", " error_log = pd.DataFrame(errors)\n", " print(f'Summarized {len(climate):,} climate row(s); errors={len(error_log):,}')\n", " display(climate.head())\n", " if not error_log.empty:\n", " display(error_log)\n" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "## Upsert to PostGIS" ] }, { "cell_type": "code", "execution_count": null, "id": "12", "metadata": {}, "outputs": [], "source": [ "UPSERT_COLUMNS = [\n", " 'master_id', 'source', 'name', 'operator', 'city', 'state', 'country', 'longitude', 'latitude',\n", " 'open_meteo_model', 'climate_period_start', 'climate_period_end', 'api_timezone',\n", " 'api_grid_latitude', 'api_grid_longitude', 'api_elevation_m', 'api_utc_offset_seconds',\n", " 'observation_days', 'observation_years', 'api_generationtime_ms',\n", " 'mean_annual_temperature_c', 'mean_summer_temperature_c', 'max_daily_temperature_c',\n", " 'mean_wet_bulb_temperature_c', 'max_wet_bulb_temperature_c', 'mean_relative_humidity_pct',\n", " 'cooling_degree_days_c', 'annual_cooling_degree_days_c_mean',\n", " 'extreme_heat_days', 'annual_extreme_heat_days_mean', 'extreme_wet_bulb_days',\n", " 'mean_diurnal_temperature_range_c', 'precipitation_total_mm',\n", " 'annual_precipitation_mm_mean', 'annual_precipitation_cv', 'wet_day_precipitation_p95_mm',\n", " 'windstorm_days', 'annual_windstorm_days_mean', 'max_wind_gust_ms',\n", " 'mean_soil_moisture_0_to_100cm', 'drought_severity_index_source_status',\n", " 'wildfire_smoke_exposure_source_status', 'open_meteo_daily_variables', 'open_meteo_url',\n", "]\n", "\n", "\n", "def clean_db_value(value):\n", " if isinstance(value, (list, tuple)):\n", " return list(value)\n", " if isinstance(value, float) and np.isnan(value):\n", " return None\n", " if pd.isna(value):\n", " return None\n", " return value\n", "\n", "\n", "def upsert_climate_rows(rows):\n", " if not rows:\n", " print('No rows to upsert.')\n", " return\n", "\n", " values = [\n", " tuple(clean_db_value(row.get(col)) for col in UPSERT_COLUMNS)\n", " for row in rows\n", " ]\n", " col_sql = sql.SQL(', ').join(map(sql.Identifier, UPSERT_COLUMNS))\n", " update_assignments = sql.SQL(', ').join(\n", " sql.SQL('{} = excluded.{}').format(sql.Identifier(col), sql.Identifier(col))\n", " for col in UPSERT_COLUMNS\n", " if col != 'master_id'\n", " )\n", "\n", " insert_sql = sql.SQL('''\n", " insert into {table} ({cols}, geom, updated_at)\n", " values %s\n", " on conflict (master_id) do update set\n", " {updates},\n", " geom = excluded.geom,\n", " fetched_at = now(),\n", " updated_at = now()\n", " ''').format(\n", " table=sql.SQL(CLIMATE_TABLE),\n", " cols=col_sql,\n", " updates=update_assignments,\n", " )\n", "\n", " template = '(' + ', '.join(['%s'] * len(UPSERT_COLUMNS)) + ', ST_SetSRID(ST_MakePoint(%s, %s), 4326), now())'\n", " values_with_geom = [row + (row[UPSERT_COLUMNS.index('longitude')], row[UPSERT_COLUMNS.index('latitude')]) for row in values]\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " execute_values(cur, insert_sql.as_string(conn), values_with_geom, template=template, page_size=500)\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(CLIMATE_TABLE)))\n", " print(f'Upserted {len(rows):,} row(s) into {CLIMATE_TABLE}')\n", "\n", "\n", "if DRY_RUN:\n", " print('DRY_RUN=True; database was not modified.')\n", "else:\n", " upsert_climate_rows(climate_rows)\n" ] }, { "cell_type": "markdown", "id": "13", "metadata": {}, "source": [ "## Inspect Results" ] }, { "cell_type": "code", "execution_count": null, "id": "14", "metadata": {}, "outputs": [], "source": [ "summary_query = f'''\n", "select\n", " count(*) as rows,\n", " min(climate_period_start) as min_period_start,\n", " max(climate_period_end) as max_period_end,\n", " count(*) filter (where drought_severity_index_source_status = 'needs_external_source') as drought_external_needed,\n", " count(*) filter (where wildfire_smoke_exposure_source_status = 'needs_external_source') as smoke_external_needed\n", "from {CLIMATE_TABLE}\n", "where open_meteo_model = %s\n", " and climate_period_start = %s::date\n", " and climate_period_end = %s::date\n", "'''\n", "\n", "sample_query = f'''\n", "select\n", " c.master_id,\n", " c.name,\n", " c.state,\n", " c.mean_annual_temperature_c,\n", " c.mean_summer_temperature_c,\n", " c.cooling_degree_days_c,\n", " c.extreme_heat_days,\n", " c.annual_precipitation_cv,\n", " c.windstorm_days\n", "from {CLIMATE_TABLE} c\n", "order by c.cooling_degree_days_c desc nulls last\n", "limit 20\n", "'''\n", "\n", "with get_conn() as conn:\n", " summary = pd.read_sql_query(summary_query, conn, params=(OPEN_METEO_MODEL, START_DATE, END_DATE))\n", " sample = pd.read_sql_query(sample_query, conn)\n", "\n", "display(summary)\n", "display(sample)\n" ] }, { "cell_type": "markdown", "id": "15", "metadata": {}, "source": [ "## Notes for the Next Data Sources\n", "\n", "Open-Meteo gets us the weather/climate burden metrics and a wind-gust proxy for windstorm frequency. These still need better external sources:\n", "\n", "- Drought severity index: NOAA/NCEI climate division or gridMET/USDM-derived drought measures are better candidates.\n", "- Wildfire smoke exposure: NOAA HMS smoke, EPA AirNow/monitor-derived PM2.5, or satellite smoke products are better candidates.\n", "- Windstorm frequency: the current field is a reanalysis gust threshold proxy. FEMA/NCEI storm events can be added later as observed hazard-event counts.\n" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.5" } }, "nbformat": 4, "nbformat_minor": 5 }