Files
data-centers/usdm_drought_data_centers.ipynb

731 lines
31 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"id": "0",
"metadata": {},
"source": [
"# USDM Drought Exposure for Master Data Centers\n",
"\n",
"Builds two tables from the US Drought Monitor weekly shapefiles in `USDM Shape Files/`:\n",
"\n",
"1. **`public.usdm_drought_weekly`** — raw weekly drought polygons (one row per `(week_date, dm_category)`). ~5 rows × ~1,350 weeks ≈ 6,800 polygons. Useful for later spatial queries (\"show me the drought map for Aug 2022\", county-level aggregation, etc.).\n",
"2. **`public.data_center_usdm_drought_exposure`** — per-DC summary keyed by `master_id`, joinable to `master_data_centers` and `data_center_historical_climate`.\n",
"\n",
"**USDM `DM` scale** (cumulative — D4 is contained within D3 is within D2…):\n",
"\n",
"| `DM` | Category | Label |\n",
"|---|---|---|\n",
"| 0 | D0 | Abnormally Dry |\n",
"| 1 | D1 | Moderate Drought |\n",
"| 2 | D2 | Severe Drought |\n",
"| 3 | D3 | Extreme Drought |\n",
"| 4 | D4 | Exceptional Drought |\n",
"\n",
"For each `(DC, week)` we record the **maximum `DM` category whose polygon contains the DC point** (or `-1` if no polygon contains it = no drought that week).\n",
"\n",
"**Coverage period:** 2000-01-04 → ~2025-mid (whatever the latest weekly file is). USDM started Jan 2000.\n",
"\n",
"**Coverage area:** CONUS + AK + HI + PR + US territories. DCs outside coverage get `usdm_status='no_coverage'` and zero weeks.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"import io\n",
"import json\n",
"import os\n",
"import re\n",
"import time\n",
"import zipfile\n",
"from datetime import date\n",
"from pathlib import Path\n",
"\n",
"import geopandas as gpd\n",
"import numpy as np\n",
"import pandas as pd\n",
"import psycopg2\n",
"from psycopg2 import sql\n",
"from psycopg2.extras import execute_values\n",
"from shapely import wkb as shapely_wkb\n",
"\n",
"pd.set_option('display.max_columns', 100)\n",
"pd.set_option('display.max_rows', 120)\n",
"\n",
"print('geopandas:', gpd.__version__)\n",
"print('pandas: ', pd.__version__)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"def load_env_file(env_path: str = '.env') -> None:\n",
" p = Path(env_path)\n",
" if not p.exists():\n",
" print(f'No {env_path} file found in {Path.cwd()}')\n",
" return\n",
" loaded = 0\n",
" for raw_line in p.read_text(encoding='utf-8').splitlines():\n",
" line = raw_line.strip()\n",
" if not line or line.startswith('#') or '=' not in line:\n",
" continue\n",
" key, value = line.split('=', 1)\n",
" key = key.strip()\n",
" value = value.strip().strip('\"').strip(\"'\")\n",
" if key and key not in os.environ:\n",
" os.environ[key] = value\n",
" loaded += 1\n",
" print(f'Loaded {loaded} env var(s) from {env_path}')\n",
"\n",
"\n",
"def require_env(keys):\n",
" missing = [k for k in keys if not os.getenv(k)]\n",
" if missing:\n",
" raise EnvironmentError(\n",
" 'Missing required env vars: ' + ', '.join(missing))\n",
"\n",
"\n",
"load_env_file('.env')\n",
"require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])\n",
"\n",
"MASTER_TABLE = 'public.master_data_centers'\n",
"WEEKLY_TABLE = 'public.usdm_drought_weekly'\n",
"DC_WEEK_TABLE = 'public.data_center_usdm_drought_dc_week'\n",
"EXPOSURE_TABLE = 'public.data_center_usdm_drought_exposure'\n",
"\n",
"\n",
"def get_conn():\n",
" return psycopg2.connect(\n",
" host=os.environ['PGWEB_HOST'],\n",
" port=os.environ['PGWEB_PORT'],\n",
" user=os.environ['PGWEB_USER'],\n",
" password=os.environ['PGWEB_PASSWORD'],\n",
" dbname='data_centers',\n",
" )\n",
"\n",
"\n",
"with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute('select current_database(), current_user')\n",
" print('Connected:', cur.fetchone())\n",
" cur.execute('create extension if not exists postgis')\n",
" cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n",
" if cur.fetchone()[0] is None:\n",
" raise RuntimeError(f'{MASTER_TABLE} missing')\n",
" cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n",
" print(f'{MASTER_TABLE}: {cur.fetchone()[0]:,} rows')\n"
]
},
{
"cell_type": "markdown",
"id": "3",
"metadata": {},
"source": [
"## Parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"USDM_DIR = Path('USDM Shape Files')\n",
"\n",
"# If True, drop and rebuild the weekly polygon table from scratch.\n",
"# If False, only insert weeks that aren't already in the table.\n",
"RELOAD_WEEKLY = False\n",
"\n",
"# If True, drop and rebuild the DC-week and exposure summary tables.\n",
"RECOMPUTE_SUMMARY = True\n",
"\n",
"assert USDM_DIR.exists(), f'{USDM_DIR.resolve()} not found'\n",
"yearly = sorted(USDM_DIR.glob('*_USDM_M.zip'))\n",
"print(f'Yearly zips: {len(yearly)}')\n",
"print(f' first: {yearly[0].name}')\n",
"print(f' last: {yearly[-1].name}')\n"
]
},
{
"cell_type": "markdown",
"id": "5",
"metadata": {},
"source": [
"## Create Tables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6",
"metadata": {},
"outputs": [],
"source": [
"CREATE_WEEKLY_SQL = f'''\n",
"create table if not exists {WEEKLY_TABLE} (\n",
" id bigserial primary key,\n",
" week_date date not null,\n",
" dm_category smallint not null check (dm_category between 0 and 4),\n",
" objectid integer,\n",
" shape_leng double precision,\n",
" shape_area double precision,\n",
" geom geometry(MultiPolygon, 4326) not null\n",
");\n",
"\n",
"create index if not exists usdm_drought_weekly_geom_gix\n",
" on {WEEKLY_TABLE} using gist (geom);\n",
"create index if not exists usdm_drought_weekly_date_idx\n",
" on {WEEKLY_TABLE} (week_date);\n",
"'''\n",
"\n",
"CREATE_DC_WEEK_SQL = f'''\n",
"create table if not exists {DC_WEEK_TABLE} (\n",
" master_id text not null references public.master_data_centers(master_id) on delete cascade,\n",
" week_date date not null,\n",
" worst_dm smallint not null, -- 0..4; -1 means observed week but no drought polygon contained the DC\n",
" primary key (master_id, week_date)\n",
");\n",
"\n",
"create index if not exists dc_week_drought_date_idx\n",
" on {DC_WEEK_TABLE} (week_date);\n",
"create index if not exists dc_week_drought_worst_idx\n",
" on {DC_WEEK_TABLE} (worst_dm);\n",
"'''\n",
"\n",
"CREATE_EXPOSURE_SQL = f'''\n",
"create table if not exists {EXPOSURE_TABLE} (\n",
" master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n",
" source text, name text, operator text, city text, state text, country text,\n",
" longitude double precision, latitude double precision,\n",
" geom geometry(Point, 4326),\n",
"\n",
" usdm_status text not null, -- 'covered' or 'no_coverage'\n",
" drought_period_start date,\n",
" drought_period_end date,\n",
" weeks_observed integer not null default 0,\n",
"\n",
" weeks_in_d0_or_worse integer not null default 0,\n",
" weeks_in_d1_or_worse integer not null default 0,\n",
" weeks_in_d2_or_worse integer not null default 0,\n",
" weeks_in_d3_or_worse integer not null default 0,\n",
" weeks_in_d4 integer not null default 0,\n",
"\n",
" pct_weeks_in_d0_or_worse double precision,\n",
" pct_weeks_in_d1_or_worse double precision,\n",
" pct_weeks_in_d2_or_worse double precision,\n",
" pct_weeks_in_d3_or_worse double precision,\n",
" pct_weeks_in_d4 double precision,\n",
"\n",
" worst_dm_category smallint, -- max DM ever observed; null if usdm_status='no_coverage'\n",
" mean_dm_category double precision, -- avg DM, treating no-drought weeks as 0\n",
" longest_d0_streak_weeks integer,\n",
" longest_d2_streak_weeks integer,\n",
" longest_d3_streak_weeks integer,\n",
"\n",
" fetched_at timestamptz not null default now(),\n",
" updated_at timestamptz not null default now()\n",
");\n",
"\n",
"create index if not exists data_center_usdm_exposure_state_idx\n",
" on {EXPOSURE_TABLE} (state);\n",
"create index if not exists data_center_usdm_exposure_geom_gix\n",
" on {EXPOSURE_TABLE} using gist (geom);\n",
"create index if not exists data_center_usdm_exposure_worst_idx\n",
" on {EXPOSURE_TABLE} (worst_dm_category);\n",
"'''\n",
"\n",
"with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" if RELOAD_WEEKLY:\n",
" cur.execute(f'drop table if exists {WEEKLY_TABLE} cascade')\n",
" cur.execute(f'drop table if exists {DC_WEEK_TABLE} cascade')\n",
" cur.execute(f'drop table if exists {EXPOSURE_TABLE} cascade')\n",
" cur.execute(CREATE_WEEKLY_SQL)\n",
" cur.execute(CREATE_DC_WEEK_SQL)\n",
" cur.execute(CREATE_EXPOSURE_SQL)\n",
" for t in (WEEKLY_TABLE, DC_WEEK_TABLE, EXPOSURE_TABLE):\n",
" cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(t)))\n",
" print(f'{t}: {cur.fetchone()[0]:,} rows')\n"
]
},
{
"cell_type": "markdown",
"id": "7",
"metadata": {},
"source": [
"## Load Weekly Shapefiles into PostGIS\n",
"\n",
"For each yearly zip:\n",
"- Open it without extracting to disk.\n",
"- For each inner weekly zip (`USDM_YYYYMMDD_M.zip`), extract to a temp dir.\n",
"- Read the shapefile with `geopandas`.\n",
"- Re-project to EPSG:4326 if needed (USDM is already 4326).\n",
"- Insert (week_date, dm_category, geom) rows.\n",
"\n",
"`RELOAD_WEEKLY=False` skips weeks already present, so a re-run is cheap.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"DATE_RX = re.compile(r'USDM_(\\d{8})', re.IGNORECASE)\n",
"\n",
"\n",
"def parse_week_date_from_name(name: str) -> date:\n",
" m = DATE_RX.search(name)\n",
" if not m:\n",
" raise ValueError(f'No date in {name}')\n",
" s = m.group(1)\n",
" return date(int(s[:4]), int(s[4:6]), int(s[6:8]))\n",
"\n",
"\n",
"def existing_weeks() -> set:\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(f'select distinct week_date from {WEEKLY_TABLE}')\n",
" return {r[0] for r in cur.fetchall()}\n",
"\n",
"\n",
"def load_one_weekly(inner_bytes: bytes, week_date_val: date, cur):\n",
" \"\"\"Extract one inner weekly zip in memory, read shapefile, insert rows.\"\"\"\n",
" # The inner zip contains .shp/.shx/.dbf/.prj — write all to a temp dir.\n",
" import tempfile\n",
" with tempfile.TemporaryDirectory() as td:\n",
" with zipfile.ZipFile(io.BytesIO(inner_bytes)) as zf:\n",
" zf.extractall(td)\n",
" shp_files = list(Path(td).glob('*.shp'))\n",
" if not shp_files:\n",
" raise RuntimeError(f'No .shp in {week_date_val}')\n",
" gdf = gpd.read_file(shp_files[0])\n",
" if gdf.crs is None:\n",
" gdf = gdf.set_crs('EPSG:4326')\n",
" elif str(gdf.crs).lower() not in ('epsg:4326', 'wgs 84'):\n",
" gdf = gdf.to_crs('EPSG:4326')\n",
"\n",
" rows = []\n",
" for _, r in gdf.iterrows():\n",
" geom = r['geometry']\n",
" if geom is None or geom.is_empty:\n",
" continue\n",
" # Coerce to MultiPolygon (some weeks have Polygon, some have MultiPolygon)\n",
" if geom.geom_type == 'Polygon':\n",
" from shapely.geometry import MultiPolygon\n",
" geom = MultiPolygon([geom])\n",
" rows.append((\n",
" week_date_val,\n",
" int(r['DM']),\n",
" int(r.get('OBJECTID', 0)) if pd.notna(r.get('OBJECTID')) else None,\n",
" float(r.get('Shape_Leng', 0.0)) if pd.notna(r.get('Shape_Leng')) else None,\n",
" float(r.get('Shape_Area', 0.0)) if pd.notna(r.get('Shape_Area')) else None,\n",
" psycopg2.Binary(shapely_wkb.dumps(geom, hex=False)),\n",
" ))\n",
"\n",
" execute_values(\n",
" cur,\n",
" f'''insert into {WEEKLY_TABLE} (week_date, dm_category, objectid, shape_leng, shape_area, geom)\n",
" values %s''',\n",
" rows,\n",
" template='(%s, %s, %s, %s, %s, ST_Multi(ST_GeomFromWKB(%s, 4326)))',\n",
" page_size=20,\n",
" )\n",
" return len(rows)\n",
"\n",
"\n",
"def load_all_weeklies():\n",
" have = set() if RELOAD_WEEKLY else existing_weeks()\n",
" print(f'Already in DB: {len(have):,} weeks')\n",
" total_inserted_weeks = 0\n",
" total_rows = 0\n",
" t_start = time.time()\n",
"\n",
" for yearly_path in sorted(USDM_DIR.glob('*_USDM_M.zip')):\n",
" with zipfile.ZipFile(yearly_path) as yz:\n",
" inner_zips = sorted(n for n in yz.namelist() if n.lower().endswith('.zip'))\n",
" year_inserted = 0\n",
" year_skipped = 0\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" for inner_name in inner_zips:\n",
" wd = parse_week_date_from_name(inner_name)\n",
" if wd in have:\n",
" year_skipped += 1\n",
" continue\n",
" inner_bytes = yz.read(inner_name)\n",
" n = load_one_weekly(inner_bytes, wd, cur)\n",
" have.add(wd)\n",
" total_inserted_weeks += 1\n",
" year_inserted += 1\n",
" total_rows += n\n",
" conn.commit()\n",
" elapsed = time.time() - t_start\n",
" print(f' {yearly_path.name}: +{year_inserted} weeks ({year_skipped} skipped) '\n",
" f'-- running total {total_inserted_weeks:,} weeks / {total_rows:,} rows / {elapsed:.0f}s')\n",
"\n",
" print(f'\\nDone: inserted {total_inserted_weeks:,} new weeks, {total_rows:,} polygon rows')\n",
"\n",
"\n",
"load_all_weeklies()\n"
]
},
{
"cell_type": "markdown",
"id": "9",
"metadata": {},
"source": [
"## Compute per-DC weekly worst-DM\n",
"\n",
"Single spatial join: for each `(DC, week)`, take the **max `dm_category`** of any drought polygon whose geometry contains the DC point.\n",
"\n",
"The CROSS JOIN with `all_weeks` then LEFT JOIN ensures that weeks where the DC was in *no* drought polygon still get a row (with `worst_dm = -1`), which is required for correct percentage and streak calculations.\n",
"\n",
"PostGIS uses the GIST index on `usdm_drought_weekly.geom` plus the btree on `week_date` to make this fast — even with ~6,800 polygons and ~1,830 DCs, the dominant work per week is checking ≤ 5 polygons against the master_data_centers points.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "10",
"metadata": {},
"outputs": [],
"source": [
"from datetime import date as _date\n",
"\n",
"# Chunk by year for progress visibility, and disable statement_timeout for the\n",
"# duration of this work (USDM polygons are huge; the spatial join is CPU-heavy).\n",
"POPULATE_INDROUGHT_YEAR_SQL = f'''\n",
"insert into {DC_WEEK_TABLE} (master_id, week_date, worst_dm)\n",
"select dc.master_id, w.week_date, max(w.dm_category)::smallint\n",
"from {MASTER_TABLE} dc\n",
"join {WEEKLY_TABLE} w on st_within(dc.geom, w.geom)\n",
"where dc.geom is not null\n",
" and w.week_date >= %s::date and w.week_date < %s::date\n",
"group by dc.master_id, w.week_date\n",
"'''\n",
"\n",
"# After the spatial join, fill in (covered_dc, week) combos that had no drought\n",
"# polygon (-1 = observed but no drought). \"Covered\" = ever-seen-in-drought in 25 yrs,\n",
"# which in practice is every CONUS DC.\n",
"POPULATE_NODROUGHT_SQL = f'''\n",
"insert into {DC_WEEK_TABLE} (master_id, week_date, worst_dm)\n",
"select cd.master_id, aw.week_date, (-1)::smallint\n",
"from (select distinct master_id from {DC_WEEK_TABLE}) cd\n",
"cross join (select distinct week_date from {WEEKLY_TABLE}) aw\n",
"where not exists (\n",
" select 1 from {DC_WEEK_TABLE} dw\n",
" where dw.master_id = cd.master_id\n",
" and dw.week_date = aw.week_date\n",
")\n",
"'''\n",
"\n",
"if RECOMPUTE_SUMMARY:\n",
" t0 = time.time()\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(f'select extract(year from min(week_date))::int, '\n",
" f'extract(year from max(week_date))::int from {WEEKLY_TABLE}')\n",
" yr_min, yr_max = cur.fetchone()\n",
" print(f'Processing years {yr_min}-{yr_max}')\n",
"\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute('set local statement_timeout = 0')\n",
" cur.execute(f'truncate {DC_WEEK_TABLE}')\n",
"\n",
" for yr in range(yr_min, yr_max + 1):\n",
" t = time.time()\n",
" cur.execute(POPULATE_INDROUGHT_YEAR_SQL,\n",
" (_date(yr, 1, 1), _date(yr + 1, 1, 1)))\n",
" cur.execute(f'select count(*) from {DC_WEEK_TABLE}')\n",
" cum = cur.fetchone()[0]\n",
" print(f' {yr}: drought rows cum = {cum:>10,} '\n",
" f'({time.time() - t:.1f}s)')\n",
"\n",
" print('Filling no-drought weeks for covered DCs...')\n",
" t = time.time()\n",
" cur.execute(POPULATE_NODROUGHT_SQL)\n",
" cur.execute(f'select count(*) from {DC_WEEK_TABLE}')\n",
" total = cur.fetchone()[0]\n",
" print(f' Total rows after fill: {total:,} ({time.time() - t:.1f}s)')\n",
" conn.commit()\n",
"\n",
" print(f'\\nDone in {(time.time() - t0) / 60:.1f} min')\n",
"\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(f'select count(distinct master_id), count(distinct week_date) from {DC_WEEK_TABLE}')\n",
" n_dc, n_wk = cur.fetchone()\n",
" print(f' unique DCs={n_dc:,} unique weeks={n_wk:,}')\n"
]
},
{
"cell_type": "markdown",
"id": "11",
"metadata": {},
"source": [
"## Compute Per-DC Exposure Summary\n",
"\n",
"Aggregates `data_center_usdm_drought_dc_week` into the headline metrics. Also marks DCs that were never seen in any drought polygon as `usdm_status='no_coverage'` (this is a conservative proxy — a CONUS DC that genuinely never had even D0 in 25 years would falsely fall into this bucket, but that's not actually possible in CONUS over a 25-year window, so the proxy holds).\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "12",
"metadata": {},
"outputs": [],
"source": [
"POPULATE_EXPOSURE_SQL = f'''\n",
"truncate {EXPOSURE_TABLE};\n",
"\n",
"with totals as (\n",
" select count(*) as total_weeks from (select distinct week_date from {WEEKLY_TABLE}) w\n",
"),\n",
"per_dc as (\n",
" select\n",
" master_id,\n",
" count(*) as weeks_observed,\n",
" sum((worst_dm >= 0)::int) as weeks_d0,\n",
" sum((worst_dm >= 1)::int) as weeks_d1,\n",
" sum((worst_dm >= 2)::int) as weeks_d2,\n",
" sum((worst_dm >= 3)::int) as weeks_d3,\n",
" sum((worst_dm >= 4)::int) as weeks_d4,\n",
" max(worst_dm) as worst_dm,\n",
" avg(greatest(worst_dm, 0)::float) as mean_dm,\n",
" min(week_date) as period_start,\n",
" max(week_date) as period_end\n",
" from {DC_WEEK_TABLE}\n",
" group by master_id\n",
"),\n",
"-- Compute consecutive-week streaks per DC using gap-and-island technique.\n",
"-- USDM weeks are spaced 7 days apart; group consecutive weeks above a threshold.\n",
"streaks as (\n",
" select\n",
" master_id,\n",
" max(case when threshold = 0 then run_len end) as longest_d0,\n",
" max(case when threshold = 2 then run_len end) as longest_d2,\n",
" max(case when threshold = 3 then run_len end) as longest_d3\n",
" from (\n",
" select master_id, threshold, count(*) as run_len\n",
" from (\n",
" select\n",
" master_id,\n",
" threshold,\n",
" week_date,\n",
" week_date - (row_number() over (partition by master_id, threshold order by week_date))::int * interval '7 days' as grp\n",
" from (\n",
" select master_id, week_date, worst_dm, 0::int as threshold from {DC_WEEK_TABLE} where worst_dm >= 0\n",
" union all\n",
" select master_id, week_date, worst_dm, 2::int from {DC_WEEK_TABLE} where worst_dm >= 2\n",
" union all\n",
" select master_id, week_date, worst_dm, 3::int from {DC_WEEK_TABLE} where worst_dm >= 3\n",
" ) sources\n",
" ) grouped\n",
" group by master_id, threshold, grp\n",
" ) runs\n",
" group by master_id\n",
")\n",
"insert into {EXPOSURE_TABLE} (\n",
" master_id, source, name, operator, city, state, country, longitude, latitude, geom,\n",
" usdm_status, drought_period_start, drought_period_end, weeks_observed,\n",
" weeks_in_d0_or_worse, weeks_in_d1_or_worse, weeks_in_d2_or_worse,\n",
" weeks_in_d3_or_worse, weeks_in_d4,\n",
" pct_weeks_in_d0_or_worse, pct_weeks_in_d1_or_worse, pct_weeks_in_d2_or_worse,\n",
" pct_weeks_in_d3_or_worse, pct_weeks_in_d4,\n",
" worst_dm_category, mean_dm_category,\n",
" longest_d0_streak_weeks, longest_d2_streak_weeks, longest_d3_streak_weeks\n",
")\n",
"select\n",
" dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,\n",
" dc.longitude, dc.latitude, dc.geom,\n",
" case when p.master_id is not null then 'covered' else 'no_coverage' end as usdm_status,\n",
" p.period_start, p.period_end, coalesce(p.weeks_observed, 0),\n",
" coalesce(p.weeks_d0, 0), coalesce(p.weeks_d1, 0), coalesce(p.weeks_d2, 0),\n",
" coalesce(p.weeks_d3, 0), coalesce(p.weeks_d4, 0),\n",
" case when p.weeks_observed > 0 then p.weeks_d0::float / p.weeks_observed end,\n",
" case when p.weeks_observed > 0 then p.weeks_d1::float / p.weeks_observed end,\n",
" case when p.weeks_observed > 0 then p.weeks_d2::float / p.weeks_observed end,\n",
" case when p.weeks_observed > 0 then p.weeks_d3::float / p.weeks_observed end,\n",
" case when p.weeks_observed > 0 then p.weeks_d4::float / p.weeks_observed end,\n",
" p.worst_dm, p.mean_dm,\n",
" coalesce(s.longest_d0, 0),\n",
" coalesce(s.longest_d2, 0),\n",
" coalesce(s.longest_d3, 0)\n",
"from {MASTER_TABLE} dc\n",
"left join per_dc p on p.master_id = dc.master_id\n",
"left join streaks s on s.master_id = dc.master_id;\n",
"\n",
"analyze {EXPOSURE_TABLE};\n",
"'''\n",
"\n",
"if RECOMPUTE_SUMMARY:\n",
" t = time.time()\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(POPULATE_EXPOSURE_SQL)\n",
" conn.commit()\n",
" print(f'Populated {EXPOSURE_TABLE} in {time.time()-t:.0f}s')\n",
"\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(f\"\"\"select usdm_status, count(*) from {EXPOSURE_TABLE} group by usdm_status order by 1\"\"\")\n",
" print('Status distribution:')\n",
" for r in cur.fetchall(): print(' ', r)\n"
]
},
{
"cell_type": "markdown",
"id": "13",
"metadata": {},
"source": [
"## Inspect Results"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "14",
"metadata": {},
"outputs": [],
"source": [
"summary_query = f'''\n",
"with covered as (\n",
" select * from {EXPOSURE_TABLE} where usdm_status = 'covered'\n",
")\n",
"select\n",
" count(*) as covered_dcs,\n",
" round(avg(pct_weeks_in_d0_or_worse * 100)::numeric, 1) as avg_pct_d0,\n",
" round(avg(pct_weeks_in_d2_or_worse * 100)::numeric, 1) as avg_pct_d2,\n",
" round(avg(pct_weeks_in_d4 * 100)::numeric, 2) as avg_pct_d4,\n",
" round(avg(mean_dm_category)::numeric, 2) as avg_mean_dm,\n",
" max(longest_d2_streak_weeks) as max_d2_streak_weeks\n",
"from covered\n",
"'''\n",
"\n",
"# Sanity checks: states that should rank high (AZ, CA, TX) vs low (WA, OR, NY)\n",
"by_state_query = f'''\n",
"select\n",
" state,\n",
" count(*) as n_dcs,\n",
" round(avg(pct_weeks_in_d2_or_worse * 100)::numeric, 1) as avg_pct_d2,\n",
" round(avg(pct_weeks_in_d4 * 100)::numeric, 2) as avg_pct_d4,\n",
" round(avg(mean_dm_category)::numeric, 2) as avg_mean_dm,\n",
" max(worst_dm_category) as worst_dm,\n",
" round(avg(longest_d2_streak_weeks)::numeric, 1) as avg_longest_d2_weeks\n",
"from {EXPOSURE_TABLE}\n",
"where usdm_status = 'covered'\n",
"group by state\n",
"order by avg_pct_d2 desc nulls last\n",
"limit 15\n",
"'''\n",
"\n",
"# Top 10 worst-exposed individual DCs\n",
"worst_query = f'''\n",
"select\n",
" master_id, name, state,\n",
" round((pct_weeks_in_d2_or_worse * 100)::numeric)::int as pct_weeks_d2,\n",
" round((pct_weeks_in_d4 * 100)::numeric, 1) as pct_weeks_d4,\n",
" weeks_in_d4, worst_dm_category,\n",
" longest_d2_streak_weeks\n",
"from {EXPOSURE_TABLE}\n",
"where usdm_status = 'covered'\n",
"order by mean_dm_category desc, pct_weeks_in_d2_or_worse desc\n",
"limit 10\n",
"'''\n",
"\n",
"with get_conn() as conn:\n",
" summary = pd.read_sql_query(summary_query, conn)\n",
" by_state = pd.read_sql_query(by_state_query, conn)\n",
" worst = pd.read_sql_query(worst_query, conn)\n",
"\n",
"print('=== Overall ===')\n",
"display(summary)\n",
"print('\\n=== Top 15 states by avg severe-drought share ===')\n",
"display(by_state)\n",
"print('\\n=== Top 10 most drought-exposed individual DCs ===')\n",
"display(worst)\n"
]
},
{
"cell_type": "markdown",
"id": "15",
"metadata": {},
"source": [
"## Notes\n",
"\n",
"- **`-1` in `data_center_usdm_drought_dc_week.worst_dm` means \"observed week, no drought polygon contained the DC\"** (i.e., D-nothing). Filter `worst_dm >= 0` for actual drought weeks.\n",
"- **`pct_weeks_in_d2_or_worse` is the headline metric** for site selection. D2 = \"Severe Drought\" — the threshold at which water restrictions typically kick in.\n",
"- **Streak metrics** count consecutive weekly observations meeting a threshold. USDM is weekly so the unit is weeks.\n",
"- **`usdm_status='no_coverage'`** identifies DCs outside USDM coverage (e.g., far-overseas territories). The CONUS DCs are all covered; PR is covered by USDM.\n",
"- The `data_center_usdm_drought_dc_week` table is the long-form per-week record. Useful for time-series analyses, but if you only need the per-DC summary you can drop it.\n"
]
},
{
"cell_type": "markdown",
"id": "16",
"metadata": {},
"source": [
"## Tables Created by This Notebook and Their Relationships\n",
"\n",
"### Tables Created / Maintained\n",
"1. `public.usdm_drought_weekly`\n",
"- Weekly USDM drought polygons by `week_date` and drought category.\n",
"\n",
"2. `public.data_center_usdm_drought_dc_week`\n",
"- One row per `(master_id, week_date)` with weekly worst drought category at each data center.\n",
"\n",
"3. `public.data_center_usdm_drought_exposure`\n",
"- One row per `master_id` with summary drought-exposure metrics and streak fields.\n",
"\n",
"### Key Relationships\n",
"- `public.usdm_drought_weekly (week_date, dm_category, geom)`\n",
" - spatial/time source for -> `public.data_center_usdm_drought_dc_week`\n",
"\n",
"- `public.master_data_centers (master_id)`\n",
" - 1-to-many -> `public.data_center_usdm_drought_dc_week (master_id, week_date)`\n",
" - 1-to-1 (effective) -> `public.data_center_usdm_drought_exposure (master_id)`\n",
"\n",
"- `public.data_center_usdm_drought_dc_week`\n",
" - many-to-1 summary rollup -> `public.data_center_usdm_drought_exposure`\n",
"\n",
"### Rerun Notes\n",
"- Supports repeat runs when new USDM weeks or new data centers are added.\n",
"- Weekly table can be reloaded and the downstream `dc_week` + `exposure` tables can be recomputed from that source."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}