From 4f3dbfc7f96e3436d84ba3ea8764f35ca873654a Mon Sep 17 00:00:00 2001 From: dadams Date: Fri, 22 May 2026 10:49:22 -0700 Subject: [PATCH] smoke and drought tables --- hms_smoke_data_centers.ipynb | 1228 +++++++++++++++++++++++++++++++ usdm_drought_data_centers.ipynb | 832 +++++++++++++++++++++ 2 files changed, 2060 insertions(+) create mode 100644 hms_smoke_data_centers.ipynb create mode 100644 usdm_drought_data_centers.ipynb diff --git a/hms_smoke_data_centers.ipynb b/hms_smoke_data_centers.ipynb new file mode 100644 index 0000000..1fb485e --- /dev/null +++ b/hms_smoke_data_centers.ipynb @@ -0,0 +1,1228 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# HMS Wildfire Smoke Exposure for Master Data Centers\n", + "\n", + "Builds four PostGIS tables from NOAA HMS smoke polygon products:\n", + "\n", + "1. **`public.hms_smoke_days`** — one row per observed HMS smoke product day, including days with zero smoke polygons. This is the denominator for percentages.\n", + "2. **`public.hms_smoke_daily`** — raw daily smoke polygons with HMS density (`Light`, `Medium`, `Heavy`).\n", + "3. **`public.data_center_hms_smoke_dc_day`** — per-DC daily exposure keyed by `(master_id, smoke_date)`, storing the maximum smoke density covering the DC point, or `0` for observed no smoke.\n", + "4. **`public.data_center_hms_smoke_exposure`** — per-DC summary keyed by `master_id`, joinable to `master_data_centers`.\n", + "\n", + "**HMS density scale used here:**\n", + "\n", + "| Rank | Density |\n", + "|---:|---|\n", + "| 0 | No smoke polygon contained the DC on an observed HMS day |\n", + "| 1 | Light or unspecified smoke density |\n", + "| 2 | Medium |\n", + "| 3 | Heavy |\n", + "\n", + "Historical backfill uses NOAA HMS annual shapefile bundles for completed years and daily shapefile ZIPs for the current year. The ArcGIS GeoJSON endpoint is optional and is used as a current-day refresh when it has features.\n", + "\n", + "Sources:\n", + "- Annual bundles: `https://satepsanone.nesdis.noaa.gov/pub/FIRE/web/HMS/Smoke_Polygons/Shapefile/Annual_Bundles/`\n", + "- Daily ZIPs: `https://satepsanone.nesdis.noaa.gov/pub/FIRE/web/HMS/Smoke_Polygons/Shapefile/YYYY/MM/hms_smokeYYYYMMDD.zip`\n", + "- Current ArcGIS smoke polygons: `NOAA_Satellite_Smoke_Detection_(v1)`" + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "import io\n", + "import os\n", + "import re\n", + "import time\n", + "import tempfile\n", + "import zipfile\n", + "from datetime import date, datetime, timedelta, timezone\n", + "from html.parser import HTMLParser\n", + "from pathlib import Path\n", + "from urllib.parse import urljoin\n", + "\n", + "import geopandas as gpd\n", + "import numpy as np\n", + "import pandas as pd\n", + "import psycopg2\n", + "import requests\n", + "from psycopg2 import sql\n", + "from psycopg2.extras import execute_values\n", + "from shapely import wkb as shapely_wkb\n", + "from shapely.geometry import MultiPolygon\n", + "\n", + "try:\n", + " from shapely.validation import make_valid\n", + "except ImportError:\n", + " make_valid = None\n", + "\n", + "pd.set_option('display.max_columns', 120)\n", + "pd.set_option('display.max_rows', 120)\n", + "\n", + "print('geopandas:', gpd.__version__)\n", + "print('pandas: ', pd.__version__)" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Database Connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "def load_env_file(env_path: str = '.env') -> None:\n", + " p = Path(env_path)\n", + " if not p.exists():\n", + " print(f'No {env_path} file found in {Path.cwd()}')\n", + " return\n", + " loaded = 0\n", + " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", + " line = raw_line.strip()\n", + " if not line or line.startswith('#') or '=' not in line:\n", + " continue\n", + " key, value = line.split('=', 1)\n", + " key = key.strip()\n", + " value = value.strip().strip('\"').strip(\"'\")\n", + " if key and key not in os.environ:\n", + " os.environ[key] = value\n", + " loaded += 1\n", + " print(f'Loaded {loaded} env var(s) from {env_path}')\n", + "\n", + "\n", + "def require_env(keys):\n", + " missing = [k for k in keys if not os.getenv(k)]\n", + " if missing:\n", + " raise EnvironmentError('Missing required env vars: ' + ', '.join(missing))\n", + "\n", + "\n", + "load_env_file('.env')\n", + "require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])\n", + "\n", + "MASTER_TABLE = 'public.master_data_centers'\n", + "DAYS_TABLE = 'public.hms_smoke_days'\n", + "SMOKE_TABLE = 'public.hms_smoke_daily'\n", + "DC_DAY_TABLE = 'public.data_center_hms_smoke_dc_day'\n", + "EXPOSURE_TABLE = 'public.data_center_hms_smoke_exposure'\n", + "\n", + "\n", + "def get_conn():\n", + " return psycopg2.connect(\n", + " host=os.environ['PGWEB_HOST'],\n", + " port=os.environ['PGWEB_PORT'],\n", + " user=os.environ['PGWEB_USER'],\n", + " password=os.environ['PGWEB_PASSWORD'],\n", + " dbname='data_centers',\n", + " )\n", + "\n", + "\n", + "with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute('select current_database(), current_user')\n", + " print('Connected:', cur.fetchone())\n", + " cur.execute('create extension if not exists postgis')\n", + " cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n", + " if cur.fetchone()[0] is None:\n", + " raise RuntimeError(f'{MASTER_TABLE} missing')\n", + " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n", + " print(f'{MASTER_TABLE}: {cur.fetchone()[0]:,} rows')" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "## Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "HMS_SHAPEFILE_BASE_URL = (\n", + " 'https://satepsanone.nesdis.noaa.gov/pub/FIRE/web/HMS/'\n", + " 'Smoke_Polygons/Shapefile/'\n", + ")\n", + "ANNUAL_BUNDLE_BASE_URL = urljoin(HMS_SHAPEFILE_BASE_URL, 'Annual_Bundles/')\n", + "\n", + "ARCGIS_SMOKE_URL = (\n", + " 'https://services2.arcgis.com/C8EMgrsFcRFL6LrL/arcgis/rest/services/'\n", + " 'NOAA_Satellite_Smoke_Detection_%28v1%29/FeatureServer/0/query'\n", + " '?where=1%3D1&outFields=*&f=geojson'\n", + ")\n", + "\n", + "CACHE_DIR = Path('data/hms_smoke')\n", + "ANNUAL_CACHE_DIR = CACHE_DIR / 'annual'\n", + "DAILY_CACHE_DIR = CACHE_DIR / 'daily'\n", + "\n", + "# HMS smoke polygon archive starts at 2005 in the NOAA directory.\n", + "START_YEAR = 2005\n", + "END_DATE = date.today()\n", + "\n", + "# Completed years are fastest from annual bundles. Current year is loaded\n", + "# from daily ZIPs so empty/no-smoke days are retained and today's partial\n", + "# product can be picked up.\n", + "LOAD_ANNUAL_BUNDLES = True\n", + "LOAD_CURRENT_YEAR_DAILY_ZIPS = True\n", + "LOAD_CURRENT_ARCGIS = True\n", + "\n", + "# If True, drop and rebuild all HMS smoke tables.\n", + "# If False, skip days already present in hms_smoke_days.\n", + "RELOAD_SMOKE = False\n", + "\n", + "# If True, rebuild the DC-day and summary exposure tables after loading source data.\n", + "RECOMPUTE_SUMMARY = True\n", + "\n", + "REQUEST_TIMEOUT = 90\n", + "USER_AGENT = 'data-center-hms-smoke-loader/1.0'\n", + "\n", + "DENSITY_RANK = {\n", + " 'na': 1,\n", + " 'n/a': 1,\n", + " 'unknown': 1,\n", + " 'unspecified': 1,\n", + " 'light': 1,\n", + " 'medium': 2,\n", + " 'heavy': 3,\n", + "}\n", + "RANK_DENSITY = {0: 'None', 1: 'Light/Unspecified', 2: 'Medium', 3: 'Heavy'}\n", + "\n", + "print(f'HMS load window: {START_YEAR}-01-01 through {END_DATE}')\n", + "print(f'Annual bundles through: {END_DATE.year - 1}')\n", + "print(f'Current-year daily ZIPs: {LOAD_CURRENT_YEAR_DAILY_ZIPS}')\n", + "print(f'ArcGIS current refresh: {LOAD_CURRENT_ARCGIS}')" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Create Tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "CREATE_DAYS_SQL = f'''\n", + "create table if not exists {DAYS_TABLE} (\n", + " smoke_date date primary key,\n", + " source text not null,\n", + " source_file text,\n", + " source_url text,\n", + " feature_count integer not null default 0,\n", + " fetched_at timestamptz not null default now(),\n", + " updated_at timestamptz not null default now()\n", + ");\n", + "'''\n", + "\n", + "CREATE_SMOKE_SQL = f'''\n", + "create table if not exists {SMOKE_TABLE} (\n", + " id bigserial primary key,\n", + " smoke_date date not null references {DAYS_TABLE}(smoke_date) on delete cascade,\n", + " satellite text,\n", + " start_raw text,\n", + " end_raw text,\n", + " start_utc timestamptz,\n", + " end_utc timestamptz,\n", + " density text,\n", + " density_rank smallint not null check (density_rank between 1 and 3),\n", + " source text not null,\n", + " source_file text,\n", + " source_url text,\n", + " geom geometry(MultiPolygon, 4326) not null,\n", + " fetched_at timestamptz not null default now()\n", + ");\n", + "\n", + "create index if not exists hms_smoke_daily_geom_gix\n", + " on {SMOKE_TABLE} using gist (geom);\n", + "create index if not exists hms_smoke_daily_date_idx\n", + " on {SMOKE_TABLE} (smoke_date);\n", + "create index if not exists hms_smoke_daily_density_idx\n", + " on {SMOKE_TABLE} (density_rank);\n", + "'''\n", + "\n", + "CREATE_DC_DAY_SQL = f'''\n", + "create table if not exists {DC_DAY_TABLE} (\n", + " master_id text not null references public.master_data_centers(master_id) on delete cascade,\n", + " smoke_date date not null references {DAYS_TABLE}(smoke_date) on delete cascade,\n", + " max_density_rank smallint not null check (max_density_rank between 0 and 3),\n", + " polygon_hits integer not null default 0,\n", + " primary key (master_id, smoke_date)\n", + ");\n", + "\n", + "create index if not exists data_center_hms_smoke_dc_day_date_idx\n", + " on {DC_DAY_TABLE} (smoke_date);\n", + "create index if not exists data_center_hms_smoke_dc_day_density_idx\n", + " on {DC_DAY_TABLE} (max_density_rank);\n", + "'''\n", + "\n", + "CREATE_EXPOSURE_SQL = f'''\n", + "create table if not exists {EXPOSURE_TABLE} (\n", + " master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n", + " source text, name text, operator text, city text, state text, country text,\n", + " longitude double precision, latitude double precision,\n", + " geom geometry(Point, 4326),\n", + "\n", + " hms_status text not null, -- 'observed', 'no_observed_days', or 'no_geom'\n", + " smoke_period_start date,\n", + " smoke_period_end date,\n", + " days_observed integer not null default 0,\n", + "\n", + " days_with_any_smoke integer not null default 0,\n", + " days_with_light_or_worse integer not null default 0,\n", + " days_with_medium_or_worse integer not null default 0,\n", + " days_with_heavy_smoke integer not null default 0,\n", + "\n", + " pct_days_with_any_smoke double precision,\n", + " pct_days_with_light_or_worse double precision,\n", + " pct_days_with_medium_or_worse double precision,\n", + " pct_days_with_heavy_smoke double precision,\n", + "\n", + " worst_density_rank smallint,\n", + " worst_density text,\n", + " mean_density_rank double precision,\n", + "\n", + " longest_any_smoke_streak_days integer,\n", + " longest_medium_or_heavy_streak_days integer,\n", + " longest_heavy_smoke_streak_days integer,\n", + "\n", + " fetched_at timestamptz not null default now(),\n", + " updated_at timestamptz not null default now()\n", + ");\n", + "\n", + "create index if not exists data_center_hms_smoke_exposure_state_idx\n", + " on {EXPOSURE_TABLE} (state);\n", + "create index if not exists data_center_hms_smoke_exposure_geom_gix\n", + " on {EXPOSURE_TABLE} using gist (geom);\n", + "create index if not exists data_center_hms_smoke_exposure_worst_idx\n", + " on {EXPOSURE_TABLE} (worst_density_rank);\n", + "'''\n", + "\n", + "with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " if RELOAD_SMOKE:\n", + " cur.execute(f'drop table if exists {EXPOSURE_TABLE} cascade')\n", + " cur.execute(f'drop table if exists {DC_DAY_TABLE} cascade')\n", + " cur.execute(f'drop table if exists {SMOKE_TABLE} cascade')\n", + " cur.execute(f'drop table if exists {DAYS_TABLE} cascade')\n", + "\n", + " cur.execute(CREATE_DAYS_SQL)\n", + " cur.execute(CREATE_SMOKE_SQL)\n", + " cur.execute(CREATE_DC_DAY_SQL)\n", + " cur.execute(CREATE_EXPOSURE_SQL)\n", + "\n", + " for t in (DAYS_TABLE, SMOKE_TABLE, DC_DAY_TABLE, EXPOSURE_TABLE):\n", + " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(t)))\n", + " print(f'{t}: {cur.fetchone()[0]:,} rows')\n", + " conn.commit()" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## HMS Download and Normalization Helpers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "class LinkParser(HTMLParser):\n", + " def __init__(self):\n", + " super().__init__()\n", + " self.links = []\n", + "\n", + " def handle_starttag(self, tag, attrs):\n", + " if tag.lower() != 'a':\n", + " return\n", + " attrs = dict(attrs)\n", + " href = attrs.get('href')\n", + " if href:\n", + " self.links.append(href)\n", + "\n", + "\n", + "DATE_RX = re.compile(r'hms_smoke(\\d{8})', re.IGNORECASE)\n", + "YEAR_RX = re.compile(r'hms_smoke(\\d{4})\\.zip$', re.IGNORECASE)\n", + "HMS_TIME_RX = re.compile(r'^\\s*(\\d{4})(\\d{3})\\s+(\\d{4})\\s*$')\n", + "\n", + "\n", + "def http_get(url: str, *, stream: bool = False, attempts: int = 4):\n", + " last_exc = None\n", + " for attempt in range(1, attempts + 1):\n", + " try:\n", + " r = requests.get(\n", + " url,\n", + " headers={'User-Agent': USER_AGENT},\n", + " timeout=REQUEST_TIMEOUT,\n", + " stream=stream,\n", + " )\n", + " r.raise_for_status()\n", + " return r\n", + " except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as exc:\n", + " last_exc = exc\n", + " if attempt == attempts:\n", + " raise\n", + " wait_s = min(30, 2 ** attempt)\n", + " print(f' HTTP retry {attempt}/{attempts} for {url}: {exc}; sleeping {wait_s}s')\n", + " time.sleep(wait_s)\n", + " raise last_exc\n", + "\n", + "\n", + "def list_links(url: str) -> list[str]:\n", + " parser = LinkParser()\n", + " parser.feed(http_get(url).text)\n", + " return [urljoin(url, href) for href in parser.links if href not in ('../',)]\n", + "\n", + "\n", + "def download_if_needed(url: str, dest: Path) -> tuple[Path, bool]:\n", + " dest.parent.mkdir(parents=True, exist_ok=True)\n", + " if dest.exists() and dest.stat().st_size > 0:\n", + " return dest, False\n", + "\n", + " tmp = dest.with_suffix(dest.suffix + '.part')\n", + " if tmp.exists():\n", + " tmp.unlink()\n", + "\n", + " with http_get(url, stream=True) as r:\n", + " with tmp.open('wb') as f:\n", + " for chunk in r.iter_content(chunk_size=1024 * 1024):\n", + " if chunk:\n", + " f.write(chunk)\n", + " tmp.replace(dest)\n", + " return dest, True\n", + "\n", + "\n", + "def parse_smoke_date_from_name(name: str) -> date | None:\n", + " m = DATE_RX.search(name)\n", + " if not m:\n", + " return None\n", + " s = m.group(1)\n", + " return date(int(s[:4]), int(s[4:6]), int(s[6:8]))\n", + "\n", + "\n", + "def parse_hms_timestamp(value) -> datetime | None:\n", + " if value is None or pd.isna(value):\n", + " return None\n", + " s = str(value).strip()\n", + " m = HMS_TIME_RX.match(s)\n", + " if not m:\n", + " return None\n", + " year, jday, hhmm = m.groups()\n", + " return datetime.strptime(f'{year}{jday}{hhmm}', '%Y%j%H%M').replace(tzinfo=timezone.utc)\n", + "\n", + "\n", + "def daterange(start: date, end: date):\n", + " d = start\n", + " while d <= end:\n", + " yield d\n", + " d += timedelta(days=1)\n", + "\n", + "\n", + "def normalize_density_label(value) -> str | None:\n", + " if value is None or pd.isna(value):\n", + " return None\n", + " raw = str(value).strip()\n", + " if not raw:\n", + " return None\n", + " if raw.lower() in ('na', 'n/a', 'unknown', 'unspecified'):\n", + " return 'Unspecified'\n", + " return raw.title()\n", + "\n", + "\n", + "def density_to_rank(value) -> int | None:\n", + " label = normalize_density_label(value)\n", + " if label is None:\n", + " return None\n", + " return DENSITY_RANK.get(label.lower())\n", + "\n", + "\n", + "def clean_polygon_geometry(geom):\n", + " if geom is None or geom.is_empty:\n", + " return None\n", + "\n", + " if not geom.is_valid:\n", + " geom = make_valid(geom) if make_valid is not None else geom.buffer(0)\n", + "\n", + " if geom.geom_type == 'Polygon':\n", + " return MultiPolygon([geom])\n", + " if geom.geom_type == 'MultiPolygon':\n", + " return geom\n", + " if geom.geom_type == 'GeometryCollection':\n", + " parts = []\n", + " for part in geom.geoms:\n", + " if part.geom_type == 'Polygon':\n", + " parts.append(part)\n", + " elif part.geom_type == 'MultiPolygon':\n", + " parts.extend(part.geoms)\n", + " return MultiPolygon(parts) if parts else None\n", + " return None\n", + "\n", + "\n", + "def empty_smoke_gdf() -> gpd.GeoDataFrame:\n", + " return gpd.GeoDataFrame(\n", + " columns=['Satellite', 'Start', 'End', 'Density', 'geometry'],\n", + " geometry='geometry',\n", + " crs='EPSG:4326',\n", + " )\n", + "\n", + "\n", + "def read_smoke_zip(zip_path: Path) -> gpd.GeoDataFrame:\n", + " with tempfile.TemporaryDirectory() as td:\n", + " try:\n", + " with zipfile.ZipFile(zip_path) as zf:\n", + " zf.extractall(td)\n", + " except zipfile.BadZipFile as exc:\n", + " raise RuntimeError(f'Bad zip file: {zip_path}') from exc\n", + "\n", + " shp_files = list(Path(td).glob('*.shp'))\n", + " if not shp_files:\n", + " return empty_smoke_gdf()\n", + "\n", + " # Some older HMS shapefiles contain unclosed rings. Pyogrio/Shapely\n", + " # can repair these while reading instead of aborting the whole year.\n", + " gdf = gpd.read_file(shp_files[0], on_invalid='fix')\n", + " if gdf.crs is None:\n", + " gdf = gdf.set_crs('EPSG:4326')\n", + " elif str(gdf.crs).lower() not in ('epsg:4326', 'wgs 84'):\n", + " gdf = gdf.to_crs('EPSG:4326')\n", + " return gdf\n", + "\n", + "\n", + "def normalize_smoke_gdf(gdf: gpd.GeoDataFrame, source_file: str) -> gpd.GeoDataFrame:\n", + " fallback_date = parse_smoke_date_from_name(source_file)\n", + "\n", + " if 'End_' in gdf.columns and 'End' not in gdf.columns:\n", + " gdf = gdf.rename(columns={'End_': 'End'})\n", + "\n", + " for col in ('Satellite', 'Start', 'End', 'Density'):\n", + " if col not in gdf.columns:\n", + " gdf[col] = None\n", + "\n", + " if 'geometry' not in gdf.columns:\n", + " gdf = empty_smoke_gdf()\n", + "\n", + " if gdf.crs is None:\n", + " gdf = gdf.set_crs('EPSG:4326')\n", + " elif str(gdf.crs).lower() not in ('epsg:4326', 'wgs 84'):\n", + " gdf = gdf.to_crs('EPSG:4326')\n", + "\n", + " gdf = gdf.copy()\n", + " gdf['start_utc'] = gdf['Start'].map(parse_hms_timestamp)\n", + " gdf['end_utc'] = gdf['End'].map(parse_hms_timestamp)\n", + " gdf['smoke_date'] = gdf['start_utc'].map(lambda x: x.date() if x is not None else fallback_date)\n", + " gdf['density_rank'] = gdf['Density'].map(density_to_rank)\n", + "\n", + " gdf = gdf[gdf['smoke_date'].notna() & gdf['density_rank'].notna()].copy()\n", + " if gdf.empty:\n", + " return gpd.GeoDataFrame(\n", + " gdf,\n", + " geometry='geometry',\n", + " crs='EPSG:4326',\n", + " )\n", + "\n", + " gdf['geometry'] = gdf.geometry.map(clean_polygon_geometry)\n", + " gdf = gdf[gdf.geometry.notna() & ~gdf.geometry.is_empty].copy()\n", + " return gpd.GeoDataFrame(gdf, geometry='geometry', crs='EPSG:4326')\n", + "\n", + "\n", + "def existing_smoke_days() -> set[date]:\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select smoke_date from {DAYS_TABLE}')\n", + " return {r[0] for r in cur.fetchall()}" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "## PostGIS Insert Helpers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "def upsert_smoke_days(cur, rows):\n", + " if not rows:\n", + " return\n", + " execute_values(\n", + " cur,\n", + " f'''\n", + " insert into {DAYS_TABLE}\n", + " (smoke_date, source, source_file, source_url, feature_count)\n", + " values %s\n", + " on conflict (smoke_date) do update set\n", + " source = excluded.source,\n", + " source_file = excluded.source_file,\n", + " source_url = excluded.source_url,\n", + " feature_count = excluded.feature_count,\n", + " updated_at = now()\n", + " ''',\n", + " rows,\n", + " page_size=1000,\n", + " )\n", + "\n", + "\n", + "def insert_smoke_polygons(cur, gdf: gpd.GeoDataFrame, source: str, source_file: str, source_url: str) -> int:\n", + " if gdf.empty:\n", + " return 0\n", + "\n", + " rows = []\n", + " for _, r in gdf.iterrows():\n", + " geom = r.geometry\n", + " if geom is None or geom.is_empty:\n", + " continue\n", + " rows.append((\n", + " r['smoke_date'],\n", + " str(r.get('Satellite')) if pd.notna(r.get('Satellite')) else None,\n", + " str(r.get('Start')) if pd.notna(r.get('Start')) else None,\n", + " str(r.get('End')) if pd.notna(r.get('End')) else None,\n", + " r.get('start_utc'),\n", + " r.get('end_utc'),\n", + " normalize_density_label(r.get('Density')),\n", + " int(r['density_rank']),\n", + " source,\n", + " source_file,\n", + " source_url,\n", + " psycopg2.Binary(shapely_wkb.dumps(geom, hex=False)),\n", + " ))\n", + "\n", + " if not rows:\n", + " return 0\n", + "\n", + " execute_values(\n", + " cur,\n", + " f'''\n", + " insert into {SMOKE_TABLE} (\n", + " smoke_date, satellite, start_raw, end_raw, start_utc, end_utc,\n", + " density, density_rank, source, source_file, source_url, geom\n", + " )\n", + " values %s\n", + " ''',\n", + " rows,\n", + " template='(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_Multi(ST_GeomFromWKB(%s, 4326)))',\n", + " page_size=1000,\n", + " )\n", + " return len(rows)\n", + "\n", + "\n", + "def feature_count_rows(days: list[date], gdf: gpd.GeoDataFrame, source: str, source_file: str, source_url: str):\n", + " if gdf.empty:\n", + " counts = {}\n", + " else:\n", + " counts = gdf.groupby('smoke_date').size().to_dict()\n", + " return [(d, source, source_file, source_url, int(counts.get(d, 0))) for d in days]" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "## Load NOAA HMS Smoke Polygons" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "def load_annual_bundle(year: int, have_days: set[date]) -> tuple[int, int]:\n", + " year_start = date(year, 1, 1)\n", + " year_end = date(year, 12, 31)\n", + " daily_urls = discover_daily_zip_urls(year_start, year_end)\n", + " listed_days = set(daily_urls.keys())\n", + "\n", + " url = urljoin(ANNUAL_BUNDLE_BASE_URL, f'hms_smoke{year}.zip')\n", + " zip_path, downloaded = download_if_needed(url, ANNUAL_CACHE_DIR / f'hms_smoke{year}.zip')\n", + " gdf = normalize_smoke_gdf(read_smoke_zip(zip_path), zip_path.name)\n", + " gdf = gdf[(gdf['smoke_date'] >= year_start) & (gdf['smoke_date'] <= year_end)].copy()\n", + " polygon_days = set(gdf['smoke_date'].dropna().unique()) if not gdf.empty else set()\n", + " year_days = sorted(listed_days | polygon_days)\n", + "\n", + " if not year_days:\n", + " print(f' {year}: no observed daily ZIPs or polygons found; skipped')\n", + " return 0, 0\n", + "\n", + " if not RELOAD_SMOKE and all(d in have_days for d in year_days):\n", + " print(f' {year}: skipped; all {len(year_days)} days already loaded')\n", + " return 0, 0\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(\n", + " f'delete from {SMOKE_TABLE} where smoke_date >= %s and smoke_date <= %s',\n", + " (year_start, year_end),\n", + " )\n", + " upsert_smoke_days(\n", + " cur,\n", + " feature_count_rows(year_days, gdf, 'annual_bundle', zip_path.name, url),\n", + " )\n", + " n_polygons = insert_smoke_polygons(cur, gdf, 'annual_bundle', zip_path.name, url)\n", + " conn.commit()\n", + "\n", + " have_days.update(year_days)\n", + " print(f' {year}: {n_polygons:,} polygons, {len(year_days)} observed days, downloaded={downloaded}')\n", + " return n_polygons, len(year_days)\n", + "\n", + "\n", + "def discover_year_month_urls(year: int) -> dict[int, str]:\n", + " year_url = f'{HMS_SHAPEFILE_BASE_URL}{year}/'\n", + " try:\n", + " links = list_links(year_url)\n", + " except requests.HTTPError as exc:\n", + " print(f' Missing/unreadable year listing: {year_url} ({exc})')\n", + " return {}\n", + "\n", + " month_urls = {}\n", + " for link in links:\n", + " name = link.rstrip('/').rsplit('/', 1)[-1]\n", + " if re.fullmatch(r'\\d{2}', name):\n", + " month_urls[int(name)] = link if link.endswith('/') else link + '/'\n", + " return month_urls\n", + "\n", + "\n", + "def discover_daily_zip_urls(start_day: date, end_day: date) -> dict[date, str]:\n", + " urls = {}\n", + " month = date(start_day.year, start_day.month, 1)\n", + " last_month = date(end_day.year, end_day.month, 1)\n", + " month_url_cache = {}\n", + "\n", + " while month <= last_month:\n", + " if month.year not in month_url_cache:\n", + " month_url_cache[month.year] = discover_year_month_urls(month.year)\n", + " month_url = month_url_cache[month.year].get(month.month)\n", + " links = list_links(month_url) if month_url is not None else []\n", + "\n", + " for link in links:\n", + " name = link.rstrip('/').rsplit('/', 1)[-1]\n", + " if not name.lower().endswith('.zip'):\n", + " continue\n", + " d = parse_smoke_date_from_name(name)\n", + " if d is not None and start_day <= d <= end_day:\n", + " urls[d] = link\n", + "\n", + " if month.month == 12:\n", + " month = date(month.year + 1, 1, 1)\n", + " else:\n", + " month = date(month.year, month.month + 1, 1)\n", + "\n", + " return dict(sorted(urls.items()))\n", + "\n", + "\n", + "def load_daily_zip(day: date, url: str, have_days: set[date]) -> tuple[int, int]:\n", + " if not RELOAD_SMOKE and day in have_days:\n", + " return 0, 0\n", + "\n", + " zip_name = f'hms_smoke{day:%Y%m%d}.zip'\n", + " zip_path, downloaded = download_if_needed(url, DAILY_CACHE_DIR / f'{day:%Y}' / f'{day:%m}' / zip_name)\n", + " gdf = normalize_smoke_gdf(read_smoke_zip(zip_path), zip_path.name)\n", + " gdf = gdf[gdf['smoke_date'] == day].copy()\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'delete from {SMOKE_TABLE} where smoke_date = %s', (day,))\n", + " upsert_smoke_days(cur, feature_count_rows([day], gdf, 'daily_zip', zip_path.name, url))\n", + " n_polygons = insert_smoke_polygons(cur, gdf, 'daily_zip', zip_path.name, url)\n", + " conn.commit()\n", + "\n", + " have_days.add(day)\n", + " print(f' {day}: {n_polygons:,} polygons, downloaded={downloaded}')\n", + " return n_polygons, 1\n", + "\n", + "\n", + "def load_current_arcgis(have_days: set[date]) -> tuple[int, int]:\n", + " try:\n", + " gdf = gpd.read_file(ARCGIS_SMOKE_URL, on_invalid='fix')\n", + " except Exception as exc:\n", + " print(f'ArcGIS current smoke refresh skipped: {exc}')\n", + " return 0, 0\n", + "\n", + " source_file = f'arcgis_current_{END_DATE:%Y%m%d}.geojson'\n", + " gdf = normalize_smoke_gdf(gdf, source_file)\n", + "\n", + " if gdf.empty:\n", + " if END_DATE not in have_days:\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " upsert_smoke_days(cur, [(END_DATE, 'arcgis_current', source_file, ARCGIS_SMOKE_URL, 0)])\n", + " conn.commit()\n", + " have_days.add(END_DATE)\n", + " print(f'ArcGIS current: 0 polygons; marked {END_DATE} as observed')\n", + " return 0, 1\n", + " print('ArcGIS current: 0 polygons; existing observed day left unchanged')\n", + " return 0, 0\n", + "\n", + " days = sorted(gdf['smoke_date'].dropna().unique())\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " for d in days:\n", + " cur.execute(f'delete from {SMOKE_TABLE} where smoke_date = %s', (d,))\n", + " upsert_smoke_days(cur, feature_count_rows(days, gdf, 'arcgis_current', source_file, ARCGIS_SMOKE_URL))\n", + " n_polygons = insert_smoke_polygons(cur, gdf, 'arcgis_current', source_file, ARCGIS_SMOKE_URL)\n", + " conn.commit()\n", + "\n", + " have_days.update(days)\n", + " print(f'ArcGIS current: {n_polygons:,} polygons across {len(days)} day(s)')\n", + " return n_polygons, len(days)\n", + "\n", + "\n", + "t0 = time.time()\n", + "have_days = set() if RELOAD_SMOKE else existing_smoke_days()\n", + "print(f'Already loaded observed HMS days: {len(have_days):,}')\n", + "\n", + "total_polygons = 0\n", + "total_days = 0\n", + "\n", + "if LOAD_ANNUAL_BUNDLES:\n", + " annual_end_year = END_DATE.year - 1\n", + " for year in range(START_YEAR, annual_end_year + 1):\n", + " n_polys, n_days = load_annual_bundle(year, have_days)\n", + " total_polygons += n_polys\n", + " total_days += n_days\n", + "\n", + "if LOAD_CURRENT_YEAR_DAILY_ZIPS:\n", + " daily_start = max(date(END_DATE.year, 1, 1), date(START_YEAR, 1, 1))\n", + " if daily_start <= END_DATE:\n", + " daily_urls = discover_daily_zip_urls(daily_start, END_DATE)\n", + " print(f'Current-year daily ZIPs discovered: {len(daily_urls):,}')\n", + " for day, url in daily_urls.items():\n", + " n_polys, n_days = load_daily_zip(day, url, have_days)\n", + " total_polygons += n_polys\n", + " total_days += n_days\n", + "\n", + "if LOAD_CURRENT_ARCGIS:\n", + " n_polys, n_days = load_current_arcgis(have_days)\n", + " total_polygons += n_polys\n", + " total_days += n_days\n", + "\n", + "elapsed = time.time() - t0\n", + "print(f'\\nSource load done: +{total_polygons:,} polygons / +{total_days:,} observed days in {elapsed/60:.1f} min')\n", + "\n", + "with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select count(*), min(smoke_date), max(smoke_date) from {DAYS_TABLE}')\n", + " print('Observed days:', cur.fetchone())\n", + " cur.execute(f'select count(*), min(smoke_date), max(smoke_date) from {SMOKE_TABLE}')\n", + " print('Smoke polygons:', cur.fetchone())" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "## Compute Per-DC Daily Worst Smoke Density" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "POPULATE_INSMOKE_YEAR_SQL = f'''\n", + "insert into {DC_DAY_TABLE} (master_id, smoke_date, max_density_rank, polygon_hits)\n", + "select\n", + " dc.master_id,\n", + " s.smoke_date,\n", + " max(s.density_rank)::smallint as max_density_rank,\n", + " count(*)::integer as polygon_hits\n", + "from {MASTER_TABLE} dc\n", + "join {SMOKE_TABLE} s\n", + " on dc.geom && s.geom\n", + " and st_intersects(dc.geom, s.geom)\n", + "where dc.geom is not null\n", + " and s.smoke_date >= %s::date\n", + " and s.smoke_date < %s::date\n", + "group by dc.master_id, s.smoke_date\n", + "'''\n", + "\n", + "POPULATE_NOSMOKE_YEAR_SQL = f'''\n", + "insert into {DC_DAY_TABLE} (master_id, smoke_date, max_density_rank, polygon_hits)\n", + "select\n", + " dc.master_id,\n", + " d.smoke_date,\n", + " 0::smallint as max_density_rank,\n", + " 0::integer as polygon_hits\n", + "from {MASTER_TABLE} dc\n", + "cross join {DAYS_TABLE} d\n", + "where dc.geom is not null\n", + " and d.smoke_date >= %s::date\n", + " and d.smoke_date < %s::date\n", + " and not exists (\n", + " select 1\n", + " from {DC_DAY_TABLE} dd\n", + " where dd.master_id = dc.master_id\n", + " and dd.smoke_date = d.smoke_date\n", + " )\n", + "'''\n", + "\n", + "if RECOMPUTE_SUMMARY:\n", + " t0 = time.time()\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select min(smoke_date), max(smoke_date) from {DAYS_TABLE}')\n", + " day_min, day_max = cur.fetchone()\n", + " if day_min is None:\n", + " raise RuntimeError(f'{DAYS_TABLE} is empty; load HMS smoke days before computing exposure')\n", + " print(f'Processing daily DC exposure from {day_min} through {day_max}')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute('set local statement_timeout = 0')\n", + " cur.execute(f'truncate {DC_DAY_TABLE}')\n", + "\n", + " for yr in range(day_min.year, day_max.year + 1):\n", + " year_start = date(yr, 1, 1)\n", + " year_end = date(yr + 1, 1, 1)\n", + "\n", + " t = time.time()\n", + " cur.execute(POPULATE_INSMOKE_YEAR_SQL, (year_start, year_end))\n", + " cur.execute(f'select count(*) from {DC_DAY_TABLE}')\n", + " smoke_rows = cur.fetchone()[0]\n", + " print(f' {yr}: smoke rows cumulative = {smoke_rows:>12,} ({time.time() - t:.1f}s)')\n", + "\n", + " t = time.time()\n", + " cur.execute(POPULATE_NOSMOKE_YEAR_SQL, (year_start, year_end))\n", + " cur.execute(f'select count(*) from {DC_DAY_TABLE}')\n", + " total_rows = cur.fetchone()[0]\n", + " print(f' {yr}: rows after no-smoke fill = {total_rows:>12,} ({time.time() - t:.1f}s)')\n", + "\n", + " cur.execute(f'analyze {DC_DAY_TABLE}')\n", + " conn.commit()\n", + "\n", + " print(f'\\nDone in {(time.time() - t0) / 60:.1f} min')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select count(distinct master_id), count(distinct smoke_date), count(*) from {DC_DAY_TABLE}')\n", + " n_dc, n_days, n_rows = cur.fetchone()\n", + " print(f' unique DCs={n_dc:,} unique days={n_days:,} rows={n_rows:,}')" + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "## Compute Per-DC Exposure Summary" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [], + "source": [ + "POPULATE_EXPOSURE_SQL = f'''\n", + "truncate {EXPOSURE_TABLE};\n", + "\n", + "with per_dc as (\n", + " select\n", + " master_id,\n", + " count(*) as days_observed,\n", + " sum((max_density_rank >= 1)::int) as days_any,\n", + " sum((max_density_rank >= 1)::int) as days_light,\n", + " sum((max_density_rank >= 2)::int) as days_medium,\n", + " sum((max_density_rank >= 3)::int) as days_heavy,\n", + " max(max_density_rank) as worst_density_rank,\n", + " avg(max_density_rank::float) as mean_density_rank,\n", + " min(smoke_date) as period_start,\n", + " max(smoke_date) as period_end\n", + " from {DC_DAY_TABLE}\n", + " group by master_id\n", + "),\n", + "streaks as (\n", + " select\n", + " master_id,\n", + " max(case when threshold = 1 then run_len end) as longest_any,\n", + " max(case when threshold = 2 then run_len end) as longest_medium,\n", + " max(case when threshold = 3 then run_len end) as longest_heavy\n", + " from (\n", + " select master_id, threshold, count(*) as run_len\n", + " from (\n", + " select\n", + " master_id,\n", + " threshold,\n", + " smoke_date,\n", + " smoke_date - (row_number() over (partition by master_id, threshold order by smoke_date))::int as grp\n", + " from (\n", + " select master_id, smoke_date, 1::int as threshold\n", + " from {DC_DAY_TABLE}\n", + " where max_density_rank >= 1\n", + " union all\n", + " select master_id, smoke_date, 2::int as threshold\n", + " from {DC_DAY_TABLE}\n", + " where max_density_rank >= 2\n", + " union all\n", + " select master_id, smoke_date, 3::int as threshold\n", + " from {DC_DAY_TABLE}\n", + " where max_density_rank >= 3\n", + " ) threshold_days\n", + " ) grouped\n", + " group by master_id, threshold, grp\n", + " ) runs\n", + " group by master_id\n", + ")\n", + "insert into {EXPOSURE_TABLE} (\n", + " master_id, source, name, operator, city, state, country, longitude, latitude, geom,\n", + " hms_status, smoke_period_start, smoke_period_end, days_observed,\n", + " days_with_any_smoke, days_with_light_or_worse, days_with_medium_or_worse,\n", + " days_with_heavy_smoke,\n", + " pct_days_with_any_smoke, pct_days_with_light_or_worse,\n", + " pct_days_with_medium_or_worse, pct_days_with_heavy_smoke,\n", + " worst_density_rank, worst_density, mean_density_rank,\n", + " longest_any_smoke_streak_days,\n", + " longest_medium_or_heavy_streak_days,\n", + " longest_heavy_smoke_streak_days\n", + ")\n", + "select\n", + " dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,\n", + " dc.longitude, dc.latitude, dc.geom,\n", + " case\n", + " when dc.geom is null then 'no_geom'\n", + " when p.master_id is null then 'no_observed_days'\n", + " else 'observed'\n", + " end as hms_status,\n", + " p.period_start,\n", + " p.period_end,\n", + " coalesce(p.days_observed, 0),\n", + " coalesce(p.days_any, 0),\n", + " coalesce(p.days_light, 0),\n", + " coalesce(p.days_medium, 0),\n", + " coalesce(p.days_heavy, 0),\n", + " case when p.days_observed > 0 then p.days_any::float / p.days_observed end,\n", + " case when p.days_observed > 0 then p.days_light::float / p.days_observed end,\n", + " case when p.days_observed > 0 then p.days_medium::float / p.days_observed end,\n", + " case when p.days_observed > 0 then p.days_heavy::float / p.days_observed end,\n", + " p.worst_density_rank,\n", + " case p.worst_density_rank\n", + " when 3 then 'Heavy'\n", + " when 2 then 'Medium'\n", + " when 1 then 'Light/Unspecified'\n", + " when 0 then 'None'\n", + " else null\n", + " end as worst_density,\n", + " p.mean_density_rank,\n", + " coalesce(s.longest_any, 0),\n", + " coalesce(s.longest_medium, 0),\n", + " coalesce(s.longest_heavy, 0)\n", + "from {MASTER_TABLE} dc\n", + "left join per_dc p on p.master_id = dc.master_id\n", + "left join streaks s on s.master_id = dc.master_id;\n", + "\n", + "analyze {EXPOSURE_TABLE};\n", + "'''\n", + "\n", + "if RECOMPUTE_SUMMARY:\n", + " t = time.time()\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute('set local statement_timeout = 0')\n", + " cur.execute(POPULATE_EXPOSURE_SQL)\n", + " conn.commit()\n", + " print(f'Populated {EXPOSURE_TABLE} in {time.time() - t:.0f}s')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select hms_status, count(*) from {EXPOSURE_TABLE} group by hms_status order by 1')\n", + " print('Status distribution:')\n", + " for r in cur.fetchall():\n", + " print(' ', r)" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "## Inspect Results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [], + "source": [ + "summary_query = f'''\n", + "with observed as (\n", + " select * from {EXPOSURE_TABLE} where hms_status = 'observed'\n", + ")\n", + "select\n", + " count(*) as observed_dcs,\n", + " round(avg(pct_days_with_any_smoke * 100)::numeric, 1) as avg_pct_any_smoke,\n", + " round(avg(pct_days_with_medium_or_worse * 100)::numeric, 1) as avg_pct_medium_or_worse,\n", + " round(avg(pct_days_with_heavy_smoke * 100)::numeric, 2) as avg_pct_heavy,\n", + " round(avg(mean_density_rank)::numeric, 2) as avg_mean_density_rank,\n", + " max(longest_medium_or_heavy_streak_days) as max_medium_heavy_streak_days\n", + "from observed\n", + "'''\n", + "\n", + "by_state_query = f'''\n", + "select\n", + " state,\n", + " count(*) as n_dcs,\n", + " round(avg(pct_days_with_any_smoke * 100)::numeric, 1) as avg_pct_any_smoke,\n", + " round(avg(pct_days_with_medium_or_worse * 100)::numeric, 1) as avg_pct_medium_or_worse,\n", + " round(avg(pct_days_with_heavy_smoke * 100)::numeric, 2) as avg_pct_heavy,\n", + " round(avg(mean_density_rank)::numeric, 2) as avg_mean_density_rank,\n", + " max(worst_density_rank) as worst_density_rank,\n", + " round(avg(longest_medium_or_heavy_streak_days)::numeric, 1) as avg_medium_heavy_streak_days\n", + "from {EXPOSURE_TABLE}\n", + "where hms_status = 'observed'\n", + "group by state\n", + "order by avg_pct_medium_or_worse desc nulls last\n", + "limit 20\n", + "'''\n", + "\n", + "worst_query = f'''\n", + "select\n", + " master_id, name, state,\n", + " round((pct_days_with_any_smoke * 100)::numeric, 1) as pct_any_smoke,\n", + " round((pct_days_with_medium_or_worse * 100)::numeric, 1) as pct_medium_or_worse,\n", + " round((pct_days_with_heavy_smoke * 100)::numeric, 2) as pct_heavy,\n", + " days_with_heavy_smoke,\n", + " worst_density,\n", + " longest_medium_or_heavy_streak_days\n", + "from {EXPOSURE_TABLE}\n", + "where hms_status = 'observed'\n", + "order by mean_density_rank desc, pct_days_with_medium_or_worse desc\n", + "limit 20\n", + "'''\n", + "\n", + "coverage_query = f'''\n", + "select\n", + " min(smoke_date) as first_day,\n", + " max(smoke_date) as last_day,\n", + " count(*) as observed_days,\n", + " sum(feature_count) as source_polygons\n", + "from {DAYS_TABLE}\n", + "'''\n", + "\n", + "with get_conn() as conn:\n", + " coverage = pd.read_sql_query(coverage_query, conn)\n", + " summary = pd.read_sql_query(summary_query, conn)\n", + " by_state = pd.read_sql_query(by_state_query, conn)\n", + " worst = pd.read_sql_query(worst_query, conn)\n", + "\n", + "print('=== HMS source coverage ===')\n", + "display(coverage)\n", + "print('\\n=== Overall ===')\n", + "display(summary)\n", + "print('\\n=== Top states by avg medium-or-worse smoke share ===')\n", + "display(by_state)\n", + "print('\\n=== Top most smoke-exposed individual DCs ===')\n", + "display(worst)" + ] + }, + { + "cell_type": "markdown", + "id": "21", + "metadata": {}, + "source": [ + "## Notes\n", + "\n", + "- `max_density_rank = 0` in `data_center_hms_smoke_dc_day` means the HMS product day was observed but no smoke polygon contained the data center point.\n", + "- `pct_days_with_medium_or_worse` is the most useful headline metric if you want to avoid counting very diffuse/light or pre-density-classification plumes as strongly as thicker smoke.\n", + "- Older HMS polygons often use `Density = NA`; this notebook maps those to rank `1` so smoke presence is preserved without counting it as medium/heavy smoke.\n", + "- HMS smoke polygons are analyst-produced satellite interpretations, not ground-level PM2.5 readings. Treat them as smoke presence/intensity exposure, not measured air quality.\n", + "- The ArcGIS endpoint is only used as a current-day refresh. Historical backfills should use NOAA HMS ZIP archives because they are stable and include the daily product history.\n", + "- The fire detections endpoint is a different product. Use it for nearby active fire/ignition analyses, not for smoke plume exposure." + ] + }, + { + "cell_type": "markdown", + "id": "22", + "metadata": {}, + "source": [ + "## Tables Created\n", + "\n", + "This notebook creates four PostGIS tables for NOAA HMS smoke exposure analysis. The tables are designed to separate source observations, raw geometries, long-form data-center exposure, and the final per-site summary.\n", + "\n", + "| Table | Grain | Purpose |\n", + "|---|---|---|\n", + "| `public.hms_smoke_days` | One row per observed HMS product day | Denominator table for daily percentages, including days with zero smoke polygons. Stores `smoke_date`, source metadata, and `feature_count`. |\n", + "| `public.hms_smoke_daily` | One row per HMS smoke polygon | Raw smoke plume geometry table. Stores `smoke_date`, satellite/time fields, normalized `density`, `density_rank`, source metadata, and `geom`. |\n", + "| `public.data_center_hms_smoke_dc_day` | One row per `(master_id, smoke_date)` | Long-form daily exposure table for every data center on every observed HMS day. `max_density_rank = 0` means observed no smoke; `1`, `2`, and `3` represent light/unspecified, medium, and heavy smoke exposure. |\n", + "| `public.data_center_hms_smoke_exposure` | One row per `master_id` | Final per-data-center summary table joinable to `public.master_data_centers`. Includes location fields, observation status, smoke-period dates, exposure-day counts, percentage metrics, worst/mean density, and longest streak metrics. |\n", + "\n", + "Recommended use:\n", + "\n", + "- Use `public.data_center_hms_smoke_exposure` for most site-level analysis and ranking.\n", + "- Use `public.data_center_hms_smoke_dc_day` for time-series analysis, seasonal summaries, or custom thresholds.\n", + "- Use `public.hms_smoke_daily` when you need the original smoke plume geometries for mapping or spatial QA.\n", + "- Use `public.hms_smoke_days` whenever calculating percentages so no-smoke observed days remain in the denominator." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/usdm_drought_data_centers.ipynb b/usdm_drought_data_centers.ipynb new file mode 100644 index 0000000..da40f31 --- /dev/null +++ b/usdm_drought_data_centers.ipynb @@ -0,0 +1,832 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# USDM Drought Exposure for Master Data Centers\n", + "\n", + "Builds two tables from the US Drought Monitor weekly shapefiles in `USDM Shape Files/`:\n", + "\n", + "1. **`public.usdm_drought_weekly`** — raw weekly drought polygons (one row per `(week_date, dm_category)`). ~5 rows × ~1,350 weeks ≈ 6,800 polygons. Useful for later spatial queries (\"show me the drought map for Aug 2022\", county-level aggregation, etc.).\n", + "2. **`public.data_center_usdm_drought_exposure`** — per-DC summary keyed by `master_id`, joinable to `master_data_centers` and `data_center_historical_climate`.\n", + "\n", + "**USDM `DM` scale** (cumulative — D4 is contained within D3 is within D2…):\n", + "\n", + "| `DM` | Category | Label |\n", + "|---|---|---|\n", + "| 0 | D0 | Abnormally Dry |\n", + "| 1 | D1 | Moderate Drought |\n", + "| 2 | D2 | Severe Drought |\n", + "| 3 | D3 | Extreme Drought |\n", + "| 4 | D4 | Exceptional Drought |\n", + "\n", + "For each `(DC, week)` we record the **maximum `DM` category whose polygon contains the DC point** (or `-1` if no polygon contains it = no drought that week).\n", + "\n", + "**Coverage period:** 2000-01-04 → ~2025-mid (whatever the latest weekly file is). USDM started Jan 2000.\n", + "\n", + "**Coverage area:** CONUS + AK + HI + PR + US territories. DCs outside coverage get `usdm_status='no_coverage'` and zero weeks.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import io\n", + "import json\n", + "import os\n", + "import re\n", + "import time\n", + "import zipfile\n", + "from datetime import date\n", + "from pathlib import Path\n", + "\n", + "import geopandas as gpd\n", + "import numpy as np\n", + "import pandas as pd\n", + "import psycopg2\n", + "from psycopg2 import sql\n", + "from psycopg2.extras import execute_values\n", + "from shapely import wkb as shapely_wkb\n", + "\n", + "pd.set_option('display.max_columns', 100)\n", + "pd.set_option('display.max_rows', 120)\n", + "\n", + "print('geopandas:', gpd.__version__)\n", + "print('pandas: ', pd.__version__)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "def load_env_file(env_path: str = '.env') -> None:\n", + " p = Path(env_path)\n", + " if not p.exists():\n", + " print(f'No {env_path} file found in {Path.cwd()}')\n", + " return\n", + " loaded = 0\n", + " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", + " line = raw_line.strip()\n", + " if not line or line.startswith('#') or '=' not in line:\n", + " continue\n", + " key, value = line.split('=', 1)\n", + " key = key.strip()\n", + " value = value.strip().strip('\"').strip(\"'\")\n", + " if key and key not in os.environ:\n", + " os.environ[key] = value\n", + " loaded += 1\n", + " print(f'Loaded {loaded} env var(s) from {env_path}')\n", + "\n", + "\n", + "def require_env(keys):\n", + " missing = [k for k in keys if not os.getenv(k)]\n", + " if missing:\n", + " raise EnvironmentError(\n", + " 'Missing required env vars: ' + ', '.join(missing))\n", + "\n", + "\n", + "load_env_file('.env')\n", + "require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])\n", + "\n", + "MASTER_TABLE = 'public.master_data_centers'\n", + "WEEKLY_TABLE = 'public.usdm_drought_weekly'\n", + "DC_WEEK_TABLE = 'public.data_center_usdm_drought_dc_week'\n", + "EXPOSURE_TABLE = 'public.data_center_usdm_drought_exposure'\n", + "\n", + "\n", + "def get_conn():\n", + " return psycopg2.connect(\n", + " host=os.environ['PGWEB_HOST'],\n", + " port=os.environ['PGWEB_PORT'],\n", + " user=os.environ['PGWEB_USER'],\n", + " password=os.environ['PGWEB_PASSWORD'],\n", + " dbname='data_centers',\n", + " )\n", + "\n", + "\n", + "with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute('select current_database(), current_user')\n", + " print('Connected:', cur.fetchone())\n", + " cur.execute('create extension if not exists postgis')\n", + " cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n", + " if cur.fetchone()[0] is None:\n", + " raise RuntimeError(f'{MASTER_TABLE} missing')\n", + " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n", + " print(f'{MASTER_TABLE}: {cur.fetchone()[0]:,} rows')\n" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "USDM_DIR = Path('USDM Shape Files')\n", + "\n", + "# If True, drop and rebuild the weekly polygon table from scratch.\n", + "# If False, only insert weeks that aren't already in the table.\n", + "RELOAD_WEEKLY = False\n", + "\n", + "# If True, drop and rebuild the DC-week and exposure summary tables.\n", + "RECOMPUTE_SUMMARY = True\n", + "\n", + "assert USDM_DIR.exists(), f'{USDM_DIR.resolve()} not found'\n", + "yearly = sorted(USDM_DIR.glob('*_USDM_M.zip'))\n", + "print(f'Yearly zips: {len(yearly)}')\n", + "print(f' first: {yearly[0].name}')\n", + "print(f' last: {yearly[-1].name}')\n" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "## Create Tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "CREATE_WEEKLY_SQL = f'''\n", + "create table if not exists {WEEKLY_TABLE} (\n", + " id bigserial primary key,\n", + " week_date date not null,\n", + " dm_category smallint not null check (dm_category between 0 and 4),\n", + " objectid integer,\n", + " shape_leng double precision,\n", + " shape_area double precision,\n", + " geom geometry(MultiPolygon, 4326) not null\n", + ");\n", + "\n", + "create index if not exists usdm_drought_weekly_geom_gix\n", + " on {WEEKLY_TABLE} using gist (geom);\n", + "create index if not exists usdm_drought_weekly_date_idx\n", + " on {WEEKLY_TABLE} (week_date);\n", + "'''\n", + "\n", + "CREATE_DC_WEEK_SQL = f'''\n", + "create table if not exists {DC_WEEK_TABLE} (\n", + " master_id text not null references public.master_data_centers(master_id) on delete cascade,\n", + " week_date date not null,\n", + " worst_dm smallint not null, -- 0..4; -1 means observed week but no drought polygon contained the DC\n", + " primary key (master_id, week_date)\n", + ");\n", + "\n", + "create index if not exists dc_week_drought_date_idx\n", + " on {DC_WEEK_TABLE} (week_date);\n", + "create index if not exists dc_week_drought_worst_idx\n", + " on {DC_WEEK_TABLE} (worst_dm);\n", + "'''\n", + "\n", + "CREATE_EXPOSURE_SQL = f'''\n", + "create table if not exists {EXPOSURE_TABLE} (\n", + " master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n", + " source text, name text, operator text, city text, state text, country text,\n", + " longitude double precision, latitude double precision,\n", + " geom geometry(Point, 4326),\n", + "\n", + " usdm_status text not null, -- 'covered' or 'no_coverage'\n", + " drought_period_start date,\n", + " drought_period_end date,\n", + " weeks_observed integer not null default 0,\n", + "\n", + " weeks_in_d0_or_worse integer not null default 0,\n", + " weeks_in_d1_or_worse integer not null default 0,\n", + " weeks_in_d2_or_worse integer not null default 0,\n", + " weeks_in_d3_or_worse integer not null default 0,\n", + " weeks_in_d4 integer not null default 0,\n", + "\n", + " pct_weeks_in_d0_or_worse double precision,\n", + " pct_weeks_in_d1_or_worse double precision,\n", + " pct_weeks_in_d2_or_worse double precision,\n", + " pct_weeks_in_d3_or_worse double precision,\n", + " pct_weeks_in_d4 double precision,\n", + "\n", + " worst_dm_category smallint, -- max DM ever observed; null if usdm_status='no_coverage'\n", + " mean_dm_category double precision, -- avg DM, treating no-drought weeks as 0\n", + " longest_d0_streak_weeks integer,\n", + " longest_d2_streak_weeks integer,\n", + " longest_d3_streak_weeks integer,\n", + "\n", + " fetched_at timestamptz not null default now(),\n", + " updated_at timestamptz not null default now()\n", + ");\n", + "\n", + "create index if not exists data_center_usdm_exposure_state_idx\n", + " on {EXPOSURE_TABLE} (state);\n", + "create index if not exists data_center_usdm_exposure_geom_gix\n", + " on {EXPOSURE_TABLE} using gist (geom);\n", + "create index if not exists data_center_usdm_exposure_worst_idx\n", + " on {EXPOSURE_TABLE} (worst_dm_category);\n", + "'''\n", + "\n", + "with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " if RELOAD_WEEKLY:\n", + " cur.execute(f'drop table if exists {WEEKLY_TABLE} cascade')\n", + " cur.execute(f'drop table if exists {DC_WEEK_TABLE} cascade')\n", + " cur.execute(f'drop table if exists {EXPOSURE_TABLE} cascade')\n", + " cur.execute(CREATE_WEEKLY_SQL)\n", + " cur.execute(CREATE_DC_WEEK_SQL)\n", + " cur.execute(CREATE_EXPOSURE_SQL)\n", + " for t in (WEEKLY_TABLE, DC_WEEK_TABLE, EXPOSURE_TABLE):\n", + " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(t)))\n", + " print(f'{t}: {cur.fetchone()[0]:,} rows')\n" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Load Weekly Shapefiles into PostGIS\n", + "\n", + "For each yearly zip:\n", + "- Open it without extracting to disk.\n", + "- For each inner weekly zip (`USDM_YYYYMMDD_M.zip`), extract to a temp dir.\n", + "- Read the shapefile with `geopandas`.\n", + "- Re-project to EPSG:4326 if needed (USDM is already 4326).\n", + "- Insert (week_date, dm_category, geom) rows.\n", + "\n", + "`RELOAD_WEEKLY=False` skips weeks already present, so a re-run is cheap.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "DATE_RX = re.compile(r'USDM_(\\d{8})', re.IGNORECASE)\n", + "\n", + "\n", + "def parse_week_date_from_name(name: str) -> date:\n", + " m = DATE_RX.search(name)\n", + " if not m:\n", + " raise ValueError(f'No date in {name}')\n", + " s = m.group(1)\n", + " return date(int(s[:4]), int(s[4:6]), int(s[6:8]))\n", + "\n", + "\n", + "def existing_weeks() -> set:\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select distinct week_date from {WEEKLY_TABLE}')\n", + " return {r[0] for r in cur.fetchall()}\n", + "\n", + "\n", + "def load_one_weekly(inner_bytes: bytes, week_date_val: date, cur):\n", + " \"\"\"Extract one inner weekly zip in memory, read shapefile, insert rows.\"\"\"\n", + " # The inner zip contains .shp/.shx/.dbf/.prj — write all to a temp dir.\n", + " import tempfile\n", + " with tempfile.TemporaryDirectory() as td:\n", + " with zipfile.ZipFile(io.BytesIO(inner_bytes)) as zf:\n", + " zf.extractall(td)\n", + " shp_files = list(Path(td).glob('*.shp'))\n", + " if not shp_files:\n", + " raise RuntimeError(f'No .shp in {week_date_val}')\n", + " gdf = gpd.read_file(shp_files[0])\n", + " if gdf.crs is None:\n", + " gdf = gdf.set_crs('EPSG:4326')\n", + " elif str(gdf.crs).lower() not in ('epsg:4326', 'wgs 84'):\n", + " gdf = gdf.to_crs('EPSG:4326')\n", + "\n", + " rows = []\n", + " for _, r in gdf.iterrows():\n", + " geom = r['geometry']\n", + " if geom is None or geom.is_empty:\n", + " continue\n", + " # Coerce to MultiPolygon (some weeks have Polygon, some have MultiPolygon)\n", + " if geom.geom_type == 'Polygon':\n", + " from shapely.geometry import MultiPolygon\n", + " geom = MultiPolygon([geom])\n", + " rows.append((\n", + " week_date_val,\n", + " int(r['DM']),\n", + " int(r.get('OBJECTID', 0)) if pd.notna(r.get('OBJECTID')) else None,\n", + " float(r.get('Shape_Leng', 0.0)) if pd.notna(r.get('Shape_Leng')) else None,\n", + " float(r.get('Shape_Area', 0.0)) if pd.notna(r.get('Shape_Area')) else None,\n", + " psycopg2.Binary(shapely_wkb.dumps(geom, hex=False)),\n", + " ))\n", + "\n", + " execute_values(\n", + " cur,\n", + " f'''insert into {WEEKLY_TABLE} (week_date, dm_category, objectid, shape_leng, shape_area, geom)\n", + " values %s''',\n", + " rows,\n", + " template='(%s, %s, %s, %s, %s, ST_Multi(ST_GeomFromWKB(%s, 4326)))',\n", + " page_size=20,\n", + " )\n", + " return len(rows)\n", + "\n", + "\n", + "def load_all_weeklies():\n", + " have = set() if RELOAD_WEEKLY else existing_weeks()\n", + " print(f'Already in DB: {len(have):,} weeks')\n", + " total_inserted_weeks = 0\n", + " total_rows = 0\n", + " t_start = time.time()\n", + "\n", + " for yearly_path in sorted(USDM_DIR.glob('*_USDM_M.zip')):\n", + " with zipfile.ZipFile(yearly_path) as yz:\n", + " inner_zips = sorted(n for n in yz.namelist() if n.lower().endswith('.zip'))\n", + " year_inserted = 0\n", + " year_skipped = 0\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " for inner_name in inner_zips:\n", + " wd = parse_week_date_from_name(inner_name)\n", + " if wd in have:\n", + " year_skipped += 1\n", + " continue\n", + " inner_bytes = yz.read(inner_name)\n", + " n = load_one_weekly(inner_bytes, wd, cur)\n", + " have.add(wd)\n", + " total_inserted_weeks += 1\n", + " year_inserted += 1\n", + " total_rows += n\n", + " conn.commit()\n", + " elapsed = time.time() - t_start\n", + " print(f' {yearly_path.name}: +{year_inserted} weeks ({year_skipped} skipped) '\n", + " f'-- running total {total_inserted_weeks:,} weeks / {total_rows:,} rows / {elapsed:.0f}s')\n", + "\n", + " print(f'\\nDone: inserted {total_inserted_weeks:,} new weeks, {total_rows:,} polygon rows')\n", + "\n", + "\n", + "load_all_weeklies()\n" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Compute per-DC weekly worst-DM\n", + "\n", + "Single spatial join: for each `(DC, week)`, take the **max `dm_category`** of any drought polygon whose geometry contains the DC point.\n", + "\n", + "The CROSS JOIN with `all_weeks` then LEFT JOIN ensures that weeks where the DC was in *no* drought polygon still get a row (with `worst_dm = -1`), which is required for correct percentage and streak calculations.\n", + "\n", + "PostGIS uses the GIST index on `usdm_drought_weekly.geom` plus the btree on `week_date` to make this fast — even with ~6,800 polygons and ~1,830 DCs, the dominant work per week is checking ≤ 5 polygons against the master_data_centers points.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import date as _date\n", + "\n", + "# Chunk by year for progress visibility, and disable statement_timeout for the\n", + "# duration of this work (USDM polygons are huge; the spatial join is CPU-heavy).\n", + "POPULATE_INDROUGHT_YEAR_SQL = f'''\n", + "insert into {DC_WEEK_TABLE} (master_id, week_date, worst_dm)\n", + "select dc.master_id, w.week_date, max(w.dm_category)::smallint\n", + "from {MASTER_TABLE} dc\n", + "join {WEEKLY_TABLE} w on st_within(dc.geom, w.geom)\n", + "where dc.geom is not null\n", + " and w.week_date >= %s::date and w.week_date < %s::date\n", + "group by dc.master_id, w.week_date\n", + "'''\n", + "\n", + "# After the spatial join, fill in (covered_dc, week) combos that had no drought\n", + "# polygon (-1 = observed but no drought). \"Covered\" = ever-seen-in-drought in 25 yrs,\n", + "# which in practice is every CONUS DC.\n", + "POPULATE_NODROUGHT_SQL = f'''\n", + "insert into {DC_WEEK_TABLE} (master_id, week_date, worst_dm)\n", + "select cd.master_id, aw.week_date, (-1)::smallint\n", + "from (select distinct master_id from {DC_WEEK_TABLE}) cd\n", + "cross join (select distinct week_date from {WEEKLY_TABLE}) aw\n", + "where not exists (\n", + " select 1 from {DC_WEEK_TABLE} dw\n", + " where dw.master_id = cd.master_id\n", + " and dw.week_date = aw.week_date\n", + ")\n", + "'''\n", + "\n", + "if RECOMPUTE_SUMMARY:\n", + " t0 = time.time()\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select extract(year from min(week_date))::int, '\n", + " f'extract(year from max(week_date))::int from {WEEKLY_TABLE}')\n", + " yr_min, yr_max = cur.fetchone()\n", + " print(f'Processing years {yr_min}-{yr_max}')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute('set local statement_timeout = 0')\n", + " cur.execute(f'truncate {DC_WEEK_TABLE}')\n", + "\n", + " for yr in range(yr_min, yr_max + 1):\n", + " t = time.time()\n", + " cur.execute(POPULATE_INDROUGHT_YEAR_SQL,\n", + " (_date(yr, 1, 1), _date(yr + 1, 1, 1)))\n", + " cur.execute(f'select count(*) from {DC_WEEK_TABLE}')\n", + " cum = cur.fetchone()[0]\n", + " print(f' {yr}: drought rows cum = {cum:>10,} '\n", + " f'({time.time() - t:.1f}s)')\n", + "\n", + " print('Filling no-drought weeks for covered DCs...')\n", + " t = time.time()\n", + " cur.execute(POPULATE_NODROUGHT_SQL)\n", + " cur.execute(f'select count(*) from {DC_WEEK_TABLE}')\n", + " total = cur.fetchone()[0]\n", + " print(f' Total rows after fill: {total:,} ({time.time() - t:.1f}s)')\n", + " conn.commit()\n", + "\n", + " print(f'\\nDone in {(time.time() - t0) / 60:.1f} min')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f'select count(distinct master_id), count(distinct week_date) from {DC_WEEK_TABLE}')\n", + " n_dc, n_wk = cur.fetchone()\n", + " print(f' unique DCs={n_dc:,} unique weeks={n_wk:,}')\n" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "## Compute Per-DC Exposure Summary\n", + "\n", + "Aggregates `data_center_usdm_drought_dc_week` into the headline metrics. Also marks DCs that were never seen in any drought polygon as `usdm_status='no_coverage'` (this is a conservative proxy — a CONUS DC that genuinely never had even D0 in 25 years would falsely fall into this bucket, but that's not actually possible in CONUS over a 25-year window, so the proxy holds).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "POPULATE_EXPOSURE_SQL = f'''\n", + "truncate {EXPOSURE_TABLE};\n", + "\n", + "with totals as (\n", + " select count(*) as total_weeks from (select distinct week_date from {WEEKLY_TABLE}) w\n", + "),\n", + "per_dc as (\n", + " select\n", + " master_id,\n", + " count(*) as weeks_observed,\n", + " sum((worst_dm >= 0)::int) as weeks_d0,\n", + " sum((worst_dm >= 1)::int) as weeks_d1,\n", + " sum((worst_dm >= 2)::int) as weeks_d2,\n", + " sum((worst_dm >= 3)::int) as weeks_d3,\n", + " sum((worst_dm >= 4)::int) as weeks_d4,\n", + " max(worst_dm) as worst_dm,\n", + " avg(greatest(worst_dm, 0)::float) as mean_dm,\n", + " min(week_date) as period_start,\n", + " max(week_date) as period_end\n", + " from {DC_WEEK_TABLE}\n", + " group by master_id\n", + "),\n", + "-- Compute consecutive-week streaks per DC using gap-and-island technique.\n", + "-- USDM weeks are spaced 7 days apart; group consecutive weeks above a threshold.\n", + "streaks as (\n", + " select\n", + " master_id,\n", + " max(case when threshold = 0 then run_len end) as longest_d0,\n", + " max(case when threshold = 2 then run_len end) as longest_d2,\n", + " max(case when threshold = 3 then run_len end) as longest_d3\n", + " from (\n", + " select master_id, threshold, count(*) as run_len\n", + " from (\n", + " select\n", + " master_id,\n", + " threshold,\n", + " week_date,\n", + " week_date - (row_number() over (partition by master_id, threshold order by week_date))::int * interval '7 days' as grp\n", + " from (\n", + " select master_id, week_date, worst_dm, 0::int as threshold from {DC_WEEK_TABLE} where worst_dm >= 0\n", + " union all\n", + " select master_id, week_date, worst_dm, 2::int from {DC_WEEK_TABLE} where worst_dm >= 2\n", + " union all\n", + " select master_id, week_date, worst_dm, 3::int from {DC_WEEK_TABLE} where worst_dm >= 3\n", + " ) sources\n", + " ) grouped\n", + " group by master_id, threshold, grp\n", + " ) runs\n", + " group by master_id\n", + ")\n", + "insert into {EXPOSURE_TABLE} (\n", + " master_id, source, name, operator, city, state, country, longitude, latitude, geom,\n", + " usdm_status, drought_period_start, drought_period_end, weeks_observed,\n", + " weeks_in_d0_or_worse, weeks_in_d1_or_worse, weeks_in_d2_or_worse,\n", + " weeks_in_d3_or_worse, weeks_in_d4,\n", + " pct_weeks_in_d0_or_worse, pct_weeks_in_d1_or_worse, pct_weeks_in_d2_or_worse,\n", + " pct_weeks_in_d3_or_worse, pct_weeks_in_d4,\n", + " worst_dm_category, mean_dm_category,\n", + " longest_d0_streak_weeks, longest_d2_streak_weeks, longest_d3_streak_weeks\n", + ")\n", + "select\n", + " dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,\n", + " dc.longitude, dc.latitude, dc.geom,\n", + " case when p.master_id is not null then 'covered' else 'no_coverage' end as usdm_status,\n", + " p.period_start, p.period_end, coalesce(p.weeks_observed, 0),\n", + " coalesce(p.weeks_d0, 0), coalesce(p.weeks_d1, 0), coalesce(p.weeks_d2, 0),\n", + " coalesce(p.weeks_d3, 0), coalesce(p.weeks_d4, 0),\n", + " case when p.weeks_observed > 0 then p.weeks_d0::float / p.weeks_observed end,\n", + " case when p.weeks_observed > 0 then p.weeks_d1::float / p.weeks_observed end,\n", + " case when p.weeks_observed > 0 then p.weeks_d2::float / p.weeks_observed end,\n", + " case when p.weeks_observed > 0 then p.weeks_d3::float / p.weeks_observed end,\n", + " case when p.weeks_observed > 0 then p.weeks_d4::float / p.weeks_observed end,\n", + " p.worst_dm, p.mean_dm,\n", + " coalesce(s.longest_d0, 0),\n", + " coalesce(s.longest_d2, 0),\n", + " coalesce(s.longest_d3, 0)\n", + "from {MASTER_TABLE} dc\n", + "left join per_dc p on p.master_id = dc.master_id\n", + "left join streaks s on s.master_id = dc.master_id;\n", + "\n", + "analyze {EXPOSURE_TABLE};\n", + "'''\n", + "\n", + "if RECOMPUTE_SUMMARY:\n", + " t = time.time()\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(POPULATE_EXPOSURE_SQL)\n", + " conn.commit()\n", + " print(f'Populated {EXPOSURE_TABLE} in {time.time()-t:.0f}s')\n", + "\n", + " with get_conn() as conn:\n", + " with conn.cursor() as cur:\n", + " cur.execute(f\"\"\"select usdm_status, count(*) from {EXPOSURE_TABLE} group by usdm_status order by 1\"\"\")\n", + " print('Status distribution:')\n", + " for r in cur.fetchall(): print(' ', r)\n" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "## Inspect Results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "summary_query = f'''\n", + "with covered as (\n", + " select * from {EXPOSURE_TABLE} where usdm_status = 'covered'\n", + ")\n", + "select\n", + " count(*) as covered_dcs,\n", + " round(avg(pct_weeks_in_d0_or_worse * 100)::numeric, 1) as avg_pct_d0,\n", + " round(avg(pct_weeks_in_d2_or_worse * 100)::numeric, 1) as avg_pct_d2,\n", + " round(avg(pct_weeks_in_d4 * 100)::numeric, 2) as avg_pct_d4,\n", + " round(avg(mean_dm_category)::numeric, 2) as avg_mean_dm,\n", + " max(longest_d2_streak_weeks) as max_d2_streak_weeks\n", + "from covered\n", + "'''\n", + "\n", + "# Sanity checks: states that should rank high (AZ, CA, TX) vs low (WA, OR, NY)\n", + "by_state_query = f'''\n", + "select\n", + " state,\n", + " count(*) as n_dcs,\n", + " round(avg(pct_weeks_in_d2_or_worse * 100)::numeric, 1) as avg_pct_d2,\n", + " round(avg(pct_weeks_in_d4 * 100)::numeric, 2) as avg_pct_d4,\n", + " round(avg(mean_dm_category)::numeric, 2) as avg_mean_dm,\n", + " max(worst_dm_category) as worst_dm,\n", + " round(avg(longest_d2_streak_weeks)::numeric, 1) as avg_longest_d2_weeks\n", + "from {EXPOSURE_TABLE}\n", + "where usdm_status = 'covered'\n", + "group by state\n", + "order by avg_pct_d2 desc nulls last\n", + "limit 15\n", + "'''\n", + "\n", + "# Top 10 worst-exposed individual DCs\n", + "worst_query = f'''\n", + "select\n", + " master_id, name, state,\n", + " round((pct_weeks_in_d2_or_worse * 100)::numeric)::int as pct_weeks_d2,\n", + " round((pct_weeks_in_d4 * 100)::numeric, 1) as pct_weeks_d4,\n", + " weeks_in_d4, worst_dm_category,\n", + " longest_d2_streak_weeks\n", + "from {EXPOSURE_TABLE}\n", + "where usdm_status = 'covered'\n", + "order by mean_dm_category desc, pct_weeks_in_d2_or_worse desc\n", + "limit 10\n", + "'''\n", + "\n", + "with get_conn() as conn:\n", + " summary = pd.read_sql_query(summary_query, conn)\n", + " by_state = pd.read_sql_query(by_state_query, conn)\n", + " worst = pd.read_sql_query(worst_query, conn)\n", + "\n", + "print('=== Overall ===')\n", + "display(summary)\n", + "print('\\n=== Top 15 states by avg severe-drought share ===')\n", + "display(by_state)\n", + "print('\\n=== Top 10 most drought-exposed individual DCs ===')\n", + "display(worst)\n" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "## Notes\n", + "\n", + "- **`-1` in `data_center_usdm_drought_dc_week.worst_dm` means \"observed week, no drought polygon contained the DC\"** (i.e., D-nothing). Filter `worst_dm >= 0` for actual drought weeks.\n", + "- **`pct_weeks_in_d2_or_worse` is the headline metric** for site selection. D2 = \"Severe Drought\" — the threshold at which water restrictions typically kick in.\n", + "- **Streak metrics** count consecutive weekly observations meeting a threshold. USDM is weekly so the unit is weeks.\n", + "- **`usdm_status='no_coverage'`** identifies DCs outside USDM coverage (e.g., far-overseas territories). The CONUS DCs are all covered; PR is covered by USDM.\n", + "- The `data_center_usdm_drought_dc_week` table is the long-form per-week record. Useful for time-series analyses, but if you only need the per-DC summary you can drop it.\n" + ] + }, + { + "cell_type": "markdown", + "id": "16", + "metadata": {}, + "source": [ + "## Tables Created\n", + "\n", + "This notebook builds three tables in the `public` schema, all keyed (directly or transitively) to `master_data_centers.master_id`.\n", + "\n", + "---\n", + "\n", + "### 1. `public.usdm_drought_weekly`\n", + "\n", + "Raw weekly USDM drought polygons — one row per `(week_date, dm_category)` (occasionally multiple rows for early-USDM weeks that published per-category fragments). Source of truth for any later spatial query against the drought record.\n", + "\n", + "| Column | Type | Meaning |\n", + "|---|---|---|\n", + "| `id` | `bigserial` PK | Surrogate row id |\n", + "| `week_date` | `date` | Tuesday-of-publication date parsed from filename (`USDM_YYYYMMDD_M.zip`) |\n", + "| `dm_category` | `smallint` | 0=D0 Abnormally Dry, 1=D1 Moderate, 2=D2 Severe, 3=D3 Extreme, 4=D4 Exceptional. **Cumulative** — D4 polygon is inside D3 inside D2… |\n", + "| `objectid`, `shape_leng`, `shape_area` | original shapefile attributes |\n", + "| `geom` | `geometry(MultiPolygon, 4326)` | Drought-affected area for that category that week |\n", + "\n", + "**Indexes:** GIST on `geom`, btree on `week_date`.\n", + "\n", + "**Size:** ~12,000 polygon rows across 1,356 weeks (Jan 2000 – mid 2025).\n", + "\n", + "**Example uses:**\n", + "```sql\n", + "-- Map of D3+ drought in August 2022\n", + "SELECT week_date, dm_category, geom\n", + "FROM usdm_drought_weekly\n", + "WHERE week_date = '2022-08-30' AND dm_category >= 3;\n", + "\n", + "-- Worst week ever for a specific lat/lon\n", + "SELECT week_date, MAX(dm_category) AS worst_dm\n", + "FROM usdm_drought_weekly\n", + "WHERE ST_Within(ST_SetSRID(ST_MakePoint(-98.5, 29.5), 4326), geom)\n", + "GROUP BY week_date ORDER BY worst_dm DESC, week_date LIMIT 10;\n", + "```\n", + "\n", + "---\n", + "\n", + "### 2. `public.data_center_usdm_drought_dc_week`\n", + "\n", + "Long-form per-(DC, week) intermediate. One row per data center per USDM week observed; useful for time-series and streak analysis. Computed from `usdm_drought_weekly` via spatial join, then back-filled so every covered DC has a row for every week.\n", + "\n", + "| Column | Type | Meaning |\n", + "|---|---|---|\n", + "| `master_id` | `text` PK (composite) | FK → `master_data_centers.master_id` |\n", + "| `week_date` | `date` PK (composite) | USDM week |\n", + "| `worst_dm` | `smallint` | Max `dm_category` whose polygon contained the DC point that week. **`-1` means observed week but no drought polygon contained the DC** (filter `worst_dm >= 0` for actual drought weeks) |\n", + "\n", + "**Indexes:** PK on `(master_id, week_date)`, btree on `week_date`, btree on `worst_dm`.\n", + "\n", + "**Size:** ~2.5 M rows (1,833 DCs × 1,356 weeks, minus DCs not covered by USDM).\n", + "\n", + "**Example uses:**\n", + "```sql\n", + "-- Drought timeline for one DC\n", + "SELECT week_date, worst_dm\n", + "FROM data_center_usdm_drought_dc_week\n", + "WHERE master_id = 'curated/1010260676' AND worst_dm >= 0\n", + "ORDER BY week_date;\n", + "\n", + "-- DCs that were in D4 during a specific week\n", + "SELECT master_id FROM data_center_usdm_drought_dc_week\n", + "WHERE week_date = '2012-07-24' AND worst_dm = 4;\n", + "```\n", + "\n", + "If you only need the per-DC summary, this table can be dropped — it's regenerable from `usdm_drought_weekly` + `master_data_centers`.\n", + "\n", + "---\n", + "\n", + "### 3. `public.data_center_usdm_drought_exposure`\n", + "\n", + "Per-DC drought-exposure summary keyed by `master_id`. The analytical surface — one row per data center with all the headline metrics. Joinable directly to `master_data_centers` and `data_center_historical_climate`.\n", + "\n", + "| Column | Type | Meaning |\n", + "|---|---|---|\n", + "| `master_id` | `text` PK | FK → `master_data_centers.master_id` |\n", + "| Identity cols | `source`, `name`, `operator`, `city`, `state`, `country`, `longitude`, `latitude`, `geom` — denormalized from master for convenience |\n", + "| `usdm_status` | `text` | `'covered'` (USDM zone) or `'no_coverage'` (outside USDM extent) |\n", + "| `drought_period_start`, `drought_period_end` | `date` | First / last USDM week observed for this DC |\n", + "| `weeks_observed` | `int` | Total weekly observations |\n", + "| `weeks_in_d0_or_worse` … `weeks_in_d4` | `int` | Cumulative weekly counts at each severity threshold |\n", + "| `pct_weeks_in_d0_or_worse` … `pct_weeks_in_d4` | `double` | Same as ratios over `weeks_observed` |\n", + "| `worst_dm_category` | `smallint` | Max DM ever experienced (0–4) |\n", + "| `mean_dm_category` | `double` | Average DM across all weeks, treating no-drought (`-1`) as 0 |\n", + "| `longest_d0_streak_weeks` | `int` | Longest consecutive run with any drought (D0+) |\n", + "| `longest_d2_streak_weeks` | `int` | Longest consecutive run with severe drought (D2+) — **the headline streak metric** |\n", + "| `longest_d3_streak_weeks` | `int` | Longest consecutive run with extreme drought (D3+) |\n", + "| `fetched_at`, `updated_at` | `timestamptz` | Provenance |\n", + "\n", + "**Indexes:** GIST on `geom`, btree on `state`, btree on `worst_dm_category`.\n", + "\n", + "**Size:** 1,833 rows (one per master DC; PR sites flagged `no_coverage` if applicable).\n", + "\n", + "**Headline metric for site-selection analysis:** `pct_weeks_in_d2_or_worse`. D2 = \"Severe Drought\" is the threshold at which water-use restrictions typically kick in for utilities and municipalities.\n", + "\n", + "**Example: joined climate + drought view for cooling-water risk analysis**\n", + "```sql\n", + "SELECT\n", + " c.master_id, c.name, c.state,\n", + " c.cooling_degree_days_c, -- baseline cooling load\n", + " c.mean_wet_bulb_temperature_c, -- evaporative-cooling efficiency\n", + " d.pct_weeks_in_d2_or_worse * 100 AS pct_severe_drought,\n", + " d.longest_d2_streak_weeks,\n", + " d.worst_dm_category\n", + "FROM data_center_historical_climate c\n", + "JOIN data_center_usdm_drought_exposure d USING (master_id)\n", + "WHERE d.usdm_status = 'covered'\n", + "ORDER BY (c.cooling_degree_days_c * d.pct_weeks_in_d2_or_worse) DESC\n", + "LIMIT 25;\n", + "```\n", + "\n", + "---\n", + "\n", + "### Relationship diagram\n", + "\n", + "```\n", + "master_data_centers (master_id PK)\n", + " │\n", + " ├── data_center_historical_climate (master_id PK) ← from open_meteo/Daymet/gridMET notebook\n", + " │\n", + " └── data_center_usdm_drought_exposure (master_id PK) ← this notebook\n", + " │\n", + " └── data_center_usdm_drought_dc_week (master_id, week_date)\n", + " │\n", + " └── usdm_drought_weekly (id PK, week_date, dm_category, geom)\n", + "```\n", + "\n", + "All three USDM tables are regenerable from the zip files in `USDM Shape Files/`. `RELOAD_WEEKLY=True` rebuilds from scratch; `RECOMPUTE_SUMMARY=True` (default) recomputes the dc-week + exposure tables from whatever's in `usdm_drought_weekly`.\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}