diff --git a/.claude/settings.json b/.claude/settings.json
new file mode 100644
index 0000000..090799c
--- /dev/null
+++ b/.claude/settings.json
@@ -0,0 +1,7 @@
+{
+ "permissions": {
+ "allow": [
+ "Bash(python3 -c ' *)"
+ ]
+ }
+}
diff --git a/build_fcc_bdc_broadband_connection_table.ipynb b/build_fcc_bdc_broadband_connection_table.ipynb
index 635a9ce..3a9056f 100644
--- a/build_fcc_bdc_broadband_connection_table.ipynb
+++ b/build_fcc_bdc_broadband_connection_table.ipynb
@@ -820,16 +820,8 @@
" from {FILES_TABLE}\n",
" where as_of_date = %s\n",
" and category = 'Summary'\n",
- " and subcategory in (\n",
- " 'Summary by Geography Type - Other Geographies',\n",
- " 'Summary by Geography Type - Census Place'\n",
- " )\n",
- " order by case subcategory\n",
- " when 'Summary by Geography Type - Other Geographies' then 0\n",
- " when 'Summary by Geography Type - Census Place' then 1\n",
- " else 2\n",
- " end,\n",
- " file_id\n",
+ " and subcategory = 'Summary by Geography Type - Other Geographies'\n",
+ " order by file_id\n",
" \"\"\",\n",
" (as_of_date,),\n",
" )\n",
@@ -1207,8 +1199,9 @@
" when coalesce((fcc_summary_json ->> 'speed_02_02')::numeric, 0) > 0 then 0.2\n",
" else null\n",
" end,\n",
+ " fcc_100_20_provider_count = null,\n",
" fcc_bdc_status = case\n",
- " when fcc_bdc_status = 'fcc_summary_joined' then 'fcc_summary_joined_derived'\n",
+ " when fcc_bdc_status = 'fcc_summary_joined' then 'fcc_summary_joined_county'\n",
" else fcc_bdc_status\n",
" end,\n",
" updated_at = now()\n",
@@ -1218,7 +1211,7 @@
" )\n",
" derived_rows = cur.rowcount\n",
"\n",
- "print(f'Derived scalar columns for rows: {derived_rows:,}')"
+ "print(f'Derived county-summary speed scalar columns for rows: {derived_rows:,}')"
]
},
{
@@ -1363,10 +1356,10 @@
" )\n",
" update {CONNECTION_TABLE} c\n",
" set\n",
- " fcc_provider_count = s.provider_count,\n",
- " fcc_fiber_provider_count = s.fiber_provider_count,\n",
- " fcc_cable_provider_count = s.cable_provider_count,\n",
- " fcc_fixed_wireless_provider_count = s.fixed_wireless_provider_count,\n",
+ " fcc_provider_count = null,\n",
+ " fcc_fiber_provider_count = null,\n",
+ " fcc_cable_provider_count = null,\n",
+ " fcc_fixed_wireless_provider_count = null,\n",
" fcc_summary_json = jsonb_set(\n",
" coalesce(c.fcc_summary_json, '{{}}'::jsonb),\n",
" '{{provider_summary}}',\n",
@@ -1376,7 +1369,8 @@
" 'fiber_provider_count', s.fiber_provider_count,\n",
" 'cable_provider_count', s.cable_provider_count,\n",
" 'fixed_wireless_provider_count', s.fixed_wireless_provider_count,\n",
- " 'copper_provider_count', s.copper_provider_count\n",
+ " 'copper_provider_count', s.copper_provider_count,\n",
+ " 'scope', 'global_catalog_not_data_center_specific'\n",
" ),\n",
" true\n",
" ),\n",
@@ -1472,11 +1466,11 @@
"source": [
"## Phase 2B: Derive Scalar Broadband Columns from Summary JSON\n",
"\n",
- "This step derives scalar speed columns from `fcc_summary_json` for easier SQL use.\n",
+ "This step derives speed scalar columns from county-summary coverage shares and leaves provider-count columns null unless location- or tract-keyed provider data are available.\n",
"\n",
"Notes:\n",
- "- `fcc_max_advertised_download_mbps` / `fcc_max_advertised_upload_mbps` are estimated from the highest speed tier with non-zero availability share.\n",
- "- Provider-count columns are populated from the separate provider-summary catalog, which is global catalog context rather than geography-specific broadband coverage."
+ "- `fcc_max_advertised_download_mbps` / `fcc_max_advertised_upload_mbps` are county-summary indicators derived from the highest speed tier with non-zero availability share.\n",
+ "- Provider-summary aggregates are stored in `fcc_summary_json -> provider_summary` with `scope = global_catalog_not_data_center_specific`; scalar provider-count columns stay null until a location- or tract-keyed build is added."
]
},
{
diff --git a/build_fcc_bdc_location_provider_aggregates.py b/build_fcc_bdc_location_provider_aggregates.py
new file mode 100644
index 0000000..c578a36
--- /dev/null
+++ b/build_fcc_bdc_location_provider_aggregates.py
@@ -0,0 +1,806 @@
+#!/usr/bin/env python3
+"""Build FCC BDC provider aggregates for data-center counties and tracts.
+
+This script uses FCC BDC State / Location Coverage files. Those files are
+provider/location-level and include block GEOIDs, so they can be aggregated to
+county and tract provider counts for only the geographies that contain data
+centers.
+"""
+
+from __future__ import annotations
+
+import argparse
+import os
+import tempfile
+import zipfile
+from collections.abc import Iterable
+from datetime import date
+from pathlib import Path
+from typing import Any
+
+import pandas as pd
+import requests
+from psycopg2.extras import execute_values
+
+from build_fcc_bdc_broadband_connection_table import (
+ CONNECTION_TABLE,
+ FCC_BASE_URL,
+ FILES_TABLE,
+ USER_AGENT,
+ fcc_credentials,
+ get_conn,
+ load_zsh_secrets,
+ parse_date,
+ require_env,
+)
+
+
+DETAIL_TABLE = "public.fcc_bdc_location_provider_geography_provider"
+AGG_TABLE = "public.fcc_bdc_location_provider_aggregate"
+PROGRESS_TABLE = "public.fcc_bdc_location_provider_file_progress"
+CROSSWALK_TABLE = "public.fcc_bdc_geoid_crosswalk"
+
+TERRESTRIAL_TECHNOLOGY_CODES = ("10", "40", "50", "70", "71", "72")
+FIXED_WIRELESS_CODES = {"70", "71", "72"}
+
+CSV_USECOLS = [
+ "provider_id",
+ "block_geoid",
+ "technology",
+ "max_advertised_download_speed",
+ "max_advertised_upload_speed",
+ "business_residential_code",
+]
+
+CT_PLANNING_TO_LEGACY_TRACT_GEOIDS = {
+ "09110520302": "09003520302",
+ "09120090500": "09001090500",
+ "09170175800": "09009175800",
+ "09190020101": "09001020101",
+ "09190020900": "09001020900",
+ "09190044300": "09001044300",
+}
+
+
+def fcc_download_headers() -> dict[str, str]:
+ username, hash_value = fcc_credentials()
+ if not username or not hash_value:
+ raise RuntimeError(
+ "FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus "
+ "FCC_API_KEY or FCC_HASH_VALUE."
+ )
+ return {
+ "username": username,
+ "hash_value": hash_value,
+ "user-agent": USER_AGENT,
+ "accept": "application/zip,*/*",
+ }
+
+
+def normalize_codes(values: Iterable[str]) -> tuple[str, ...]:
+ return tuple(str(v).strip() for v in values if str(v).strip())
+
+
+def create_tables(cur) -> None:
+ cur.execute(
+ f"""
+ create table if not exists {DETAIL_TABLE} (
+ as_of_date date not null,
+ file_id bigint not null,
+ geography_type text not null check (geography_type in ('County', 'Tract')),
+ geoid text not null,
+ provider_id bigint not null,
+ has_fiber boolean not null default false,
+ has_cable boolean not null default false,
+ has_fixed_wireless boolean not null default false,
+ has_copper boolean not null default false,
+ has_100_20 boolean not null default false,
+ has_business boolean not null default false,
+ has_business_fiber boolean not null default false,
+ has_business_100_20 boolean not null default false,
+ max_advertised_download_mbps numeric,
+ max_advertised_upload_mbps numeric,
+ matched_location_rows bigint not null default 0,
+ updated_at timestamptz not null default now(),
+ primary key (as_of_date, file_id, geography_type, geoid, provider_id)
+ )
+ """
+ )
+ cur.execute(
+ f"create index if not exists fcc_bdc_location_provider_geo_idx "
+ f"on {DETAIL_TABLE} (as_of_date, geography_type, geoid)"
+ )
+
+ cur.execute(
+ f"""
+ create table if not exists {AGG_TABLE} (
+ as_of_date date not null,
+ geography_type text not null check (geography_type in ('County', 'Tract')),
+ geoid text not null,
+ provider_count integer not null,
+ fiber_provider_count integer not null,
+ cable_provider_count integer not null,
+ fixed_wireless_provider_count integer not null,
+ copper_provider_count integer not null,
+ provider_100_20_count integer not null,
+ business_provider_count integer not null,
+ business_fiber_provider_count integer not null,
+ business_100_20_provider_count integer not null,
+ max_advertised_download_mbps numeric,
+ max_advertised_upload_mbps numeric,
+ matched_location_rows bigint not null,
+ provider_file_rows bigint not null,
+ updated_at timestamptz not null default now(),
+ primary key (as_of_date, geography_type, geoid)
+ )
+ """
+ )
+
+ cur.execute(
+ f"""
+ create table if not exists {PROGRESS_TABLE} (
+ as_of_date date not null,
+ file_id bigint not null,
+ state_fips text not null,
+ technology_code text,
+ technology_code_desc text,
+ record_count bigint,
+ matched_location_rows bigint not null,
+ provider_geo_rows bigint not null,
+ processed_at timestamptz not null default now(),
+ primary key (as_of_date, file_id)
+ )
+ """
+ )
+
+ cur.execute(
+ f"""
+ create table if not exists {CROSSWALK_TABLE} (
+ source_geography_type text not null,
+ source_geoid text not null,
+ fcc_geography_type text not null,
+ fcc_geoid text not null,
+ method text not null,
+ notes text,
+ updated_at timestamptz not null default now(),
+ primary key (source_geography_type, source_geoid, fcc_geography_type)
+ )
+ """
+ )
+
+ add_columns = [
+ "fcc_provider_geography_type text",
+ "fcc_provider_geoid text",
+ "fcc_county_provider_count integer",
+ "fcc_county_fiber_provider_count integer",
+ "fcc_county_cable_provider_count integer",
+ "fcc_county_fixed_wireless_provider_count integer",
+ "fcc_county_100_20_provider_count integer",
+ "fcc_county_business_provider_count integer",
+ "fcc_county_business_fiber_provider_count integer",
+ "fcc_county_business_100_20_provider_count integer",
+ "fcc_county_max_advertised_download_mbps numeric",
+ "fcc_county_max_advertised_upload_mbps numeric",
+ "fcc_tract_provider_count integer",
+ "fcc_tract_fiber_provider_count integer",
+ "fcc_tract_cable_provider_count integer",
+ "fcc_tract_fixed_wireless_provider_count integer",
+ "fcc_tract_100_20_provider_count integer",
+ "fcc_tract_business_provider_count integer",
+ "fcc_tract_business_fiber_provider_count integer",
+ "fcc_tract_business_100_20_provider_count integer",
+ "fcc_tract_max_advertised_download_mbps numeric",
+ "fcc_tract_max_advertised_upload_mbps numeric",
+ ]
+ for definition in add_columns:
+ cur.execute(f"alter table {CONNECTION_TABLE} add column if not exists {definition}")
+
+
+def seed_geoid_crosswalk(cur) -> None:
+ values = [
+ (
+ "Tract",
+ source_geoid,
+ "Tract",
+ fcc_geoid,
+ "ct_planning_region_to_legacy_county_same_tractce",
+ "Connecticut 2024 tract GEOIDs use planning-region county equivalents; FCC BDC block GEOIDs use legacy county codes.",
+ )
+ for source_geoid, fcc_geoid in CT_PLANNING_TO_LEGACY_TRACT_GEOIDS.items()
+ ]
+ execute_values(
+ cur,
+ f"""
+ insert into {CROSSWALK_TABLE} (
+ source_geography_type, source_geoid, fcc_geography_type,
+ fcc_geoid, method, notes
+ )
+ values %s
+ on conflict (source_geography_type, source_geoid, fcc_geography_type)
+ do update set
+ fcc_geoid = excluded.fcc_geoid,
+ method = excluded.method,
+ notes = excluded.notes,
+ updated_at = now()
+ """,
+ values,
+ )
+
+
+def latest_catalog_date(cur) -> date:
+ cur.execute(f"select max(as_of_date) from {FILES_TABLE}")
+ value = cur.fetchone()[0]
+ if value is None:
+ raise RuntimeError(f"No FCC catalog rows found in {FILES_TABLE}. Run the FCC catalog load first.")
+ return value
+
+
+def target_geographies(cur, states: tuple[str, ...] | None = None) -> tuple[set[str], set[str], set[str]]:
+ state_filter = ""
+ params: list[Any] = []
+ if states:
+ state_filter = "where left(census_tract_geoid, 2) = any(%s)"
+ params.append(list(states))
+ cur.execute(
+ f"""
+ select distinct
+ left(census_tract_geoid, 2) as state_fips,
+ left(census_tract_geoid, 5) as county_geoid,
+ left(census_tract_geoid, 11) as tract_geoid
+ from {CONNECTION_TABLE}
+ {state_filter}
+ """,
+ params,
+ )
+ rows = cur.fetchall()
+ states_found = {r[0] for r in rows if r[0]}
+ counties = {r[1] for r in rows if r[1]}
+ tracts = {r[2] for r in rows if r[2]}
+
+ if tracts:
+ cur.execute(
+ f"""
+ select fcc_geoid
+ from {CROSSWALK_TABLE}
+ where source_geography_type = 'Tract'
+ and fcc_geography_type = 'Tract'
+ and source_geoid = any(%s)
+ """,
+ (list(tracts),),
+ )
+ fcc_tracts = {r[0] for r in cur.fetchall() if r[0]}
+ tracts.update(fcc_tracts)
+ counties.update({geoid[:5] for geoid in fcc_tracts})
+
+ return states_found, counties, tracts
+
+
+def catalog_files(
+ cur,
+ as_of_date: date,
+ states: set[str],
+ technology_codes: tuple[str, ...],
+ limit: int | None,
+) -> list[dict[str, Any]]:
+ cur.execute(
+ f"""
+ select file_id, state_fips, technology_code, technology_code_desc, file_name, record_count
+ from {FILES_TABLE}
+ where as_of_date = %s
+ and category = 'State'
+ and subcategory = 'Location Coverage'
+ and state_fips = any(%s)
+ and technology_code = any(%s)
+ order by state_fips, technology_code, file_id
+ """,
+ (as_of_date, list(states), list(technology_codes)),
+ )
+ rows = [
+ {
+ "file_id": int(file_id),
+ "state_fips": state_fips,
+ "technology_code": str(technology_code),
+ "technology_code_desc": technology_code_desc,
+ "file_name": file_name,
+ "record_count": record_count,
+ }
+ for file_id, state_fips, technology_code, technology_code_desc, file_name, record_count in cur.fetchall()
+ ]
+ return rows[:limit] if limit is not None else rows
+
+
+def progress_done(cur, as_of_date: date, file_id: int) -> bool:
+ cur.execute(
+ f"select 1 from {PROGRESS_TABLE} where as_of_date = %s and file_id = %s",
+ (as_of_date, file_id),
+ )
+ return cur.fetchone() is not None
+
+
+def download_file(file_id: int, dest_dir: Path) -> Path:
+ url = f"{FCC_BASE_URL}/map/downloads/downloadFile/availability/{file_id}"
+ path = dest_dir / f"fcc_bdc_availability_{file_id}.zip"
+ with requests.get(url, headers=fcc_download_headers(), stream=True, timeout=(15, 300)) as response:
+ response.raise_for_status()
+ with path.open("wb") as fh:
+ for chunk in response.iter_content(chunk_size=1024 * 1024):
+ if chunk:
+ fh.write(chunk)
+ return path
+
+
+def normalize_block_geoid(series: pd.Series) -> pd.Series:
+ return series.astype("string").str.replace(r"\.0$", "", regex=True).str.zfill(15)
+
+
+def summarize_matches(
+ chunk: pd.DataFrame,
+ geography_type: str,
+ target_geoids: set[str],
+) -> tuple[pd.DataFrame, int]:
+ geoid_len = 5 if geography_type == "County" else 11
+ geoid = chunk["block_geoid_norm"].str[:geoid_len]
+ matched = chunk[geoid.isin(target_geoids)].copy()
+ if matched.empty:
+ return pd.DataFrame(), 0
+
+ matched["geoid"] = geoid[matched.index]
+ matched["provider_id_num"] = pd.to_numeric(matched["provider_id"], errors="coerce")
+ matched = matched[matched["provider_id_num"].notna()].copy()
+ if matched.empty:
+ return pd.DataFrame(), 0
+
+ tech = matched["technology"].astype("string").str.replace(r"\.0$", "", regex=True)
+ down = pd.to_numeric(matched["max_advertised_download_speed"], errors="coerce")
+ upload = pd.to_numeric(matched["max_advertised_upload_speed"], errors="coerce")
+ business_code = matched["business_residential_code"].astype("string").str.upper().fillna("")
+ business = business_code.isin(["B", "X"])
+
+ matched["provider_id_num"] = matched["provider_id_num"].astype("int64")
+ matched["has_fiber"] = tech.eq("50")
+ matched["has_cable"] = tech.eq("40")
+ matched["has_fixed_wireless"] = tech.isin(FIXED_WIRELESS_CODES)
+ matched["has_copper"] = tech.eq("10")
+ matched["has_100_20"] = down.ge(100) & upload.ge(20)
+ matched["has_business"] = business
+ matched["has_business_fiber"] = business & matched["has_fiber"]
+ matched["has_business_100_20"] = business & matched["has_100_20"]
+ matched["max_down"] = down
+ matched["max_up"] = upload
+ matched["matched_location_rows"] = 1
+
+ grouped = (
+ matched.groupby(["geoid", "provider_id_num"], as_index=False)
+ .agg(
+ has_fiber=("has_fiber", "max"),
+ has_cable=("has_cable", "max"),
+ has_fixed_wireless=("has_fixed_wireless", "max"),
+ has_copper=("has_copper", "max"),
+ has_100_20=("has_100_20", "max"),
+ has_business=("has_business", "max"),
+ has_business_fiber=("has_business_fiber", "max"),
+ has_business_100_20=("has_business_100_20", "max"),
+ max_down=("max_down", "max"),
+ max_up=("max_up", "max"),
+ matched_location_rows=("matched_location_rows", "sum"),
+ )
+ )
+ return grouped, len(matched)
+
+
+def upsert_detail(
+ cur,
+ as_of_date: date,
+ file_id: int,
+ geography_type: str,
+ grouped: pd.DataFrame,
+) -> int:
+ if grouped.empty:
+ return 0
+
+ values = [
+ (
+ as_of_date,
+ file_id,
+ geography_type,
+ row.geoid,
+ int(row.provider_id_num),
+ bool(row.has_fiber),
+ bool(row.has_cable),
+ bool(row.has_fixed_wireless),
+ bool(row.has_copper),
+ bool(row.has_100_20),
+ bool(row.has_business),
+ bool(row.has_business_fiber),
+ bool(row.has_business_100_20),
+ None if pd.isna(row.max_down) else float(row.max_down),
+ None if pd.isna(row.max_up) else float(row.max_up),
+ int(row.matched_location_rows),
+ )
+ for row in grouped.itertuples(index=False)
+ ]
+ execute_values(
+ cur,
+ f"""
+ insert into {DETAIL_TABLE} (
+ as_of_date, file_id, geography_type, geoid, provider_id,
+ has_fiber, has_cable, has_fixed_wireless, has_copper,
+ has_100_20, has_business, has_business_fiber, has_business_100_20,
+ max_advertised_download_mbps, max_advertised_upload_mbps,
+ matched_location_rows
+ )
+ values %s
+ on conflict (as_of_date, file_id, geography_type, geoid, provider_id)
+ do update set
+ has_fiber = {DETAIL_TABLE}.has_fiber or excluded.has_fiber,
+ has_cable = {DETAIL_TABLE}.has_cable or excluded.has_cable,
+ has_fixed_wireless = {DETAIL_TABLE}.has_fixed_wireless or excluded.has_fixed_wireless,
+ has_copper = {DETAIL_TABLE}.has_copper or excluded.has_copper,
+ has_100_20 = {DETAIL_TABLE}.has_100_20 or excluded.has_100_20,
+ has_business = {DETAIL_TABLE}.has_business or excluded.has_business,
+ has_business_fiber = {DETAIL_TABLE}.has_business_fiber or excluded.has_business_fiber,
+ has_business_100_20 = {DETAIL_TABLE}.has_business_100_20 or excluded.has_business_100_20,
+ max_advertised_download_mbps = greatest(
+ {DETAIL_TABLE}.max_advertised_download_mbps,
+ excluded.max_advertised_download_mbps
+ ),
+ max_advertised_upload_mbps = greatest(
+ {DETAIL_TABLE}.max_advertised_upload_mbps,
+ excluded.max_advertised_upload_mbps
+ ),
+ matched_location_rows = {DETAIL_TABLE}.matched_location_rows + excluded.matched_location_rows,
+ updated_at = now()
+ """,
+ values,
+ page_size=1000,
+ )
+ return len(values)
+
+
+def process_file(
+ conn,
+ file_row: dict[str, Any],
+ as_of_date: date,
+ county_geoids: set[str],
+ tract_geoids: set[str],
+ chunksize: int,
+ temp_dir: Path,
+) -> tuple[int, int]:
+ file_id = file_row["file_id"]
+ zip_path = download_file(file_id, temp_dir)
+ matched_rows = 0
+ provider_geo_rows = 0
+
+ try:
+ with zipfile.ZipFile(zip_path) as archive:
+ csv_names = [name for name in archive.namelist() if name.lower().endswith(".csv")]
+ if not csv_names:
+ raise RuntimeError(f"FCC file_id={file_id} did not contain a CSV: {archive.namelist()}")
+ with archive.open(csv_names[0]) as csv_file:
+ reader = pd.read_csv(
+ csv_file,
+ usecols=CSV_USECOLS,
+ dtype="string",
+ chunksize=chunksize,
+ low_memory=False,
+ )
+ with conn.cursor() as cur:
+ cur.execute(
+ f"delete from {DETAIL_TABLE} where as_of_date = %s and file_id = %s",
+ (as_of_date, file_id),
+ )
+ cur.execute(
+ f"delete from {PROGRESS_TABLE} where as_of_date = %s and file_id = %s",
+ (as_of_date, file_id),
+ )
+ conn.commit()
+
+ for chunk_number, chunk in enumerate(reader, start=1):
+ chunk["block_geoid_norm"] = normalize_block_geoid(chunk["block_geoid"])
+
+ county_grouped, county_matches = summarize_matches(chunk, "County", county_geoids)
+ tract_grouped, tract_matches = summarize_matches(chunk, "Tract", tract_geoids)
+
+ with conn.cursor() as cur:
+ provider_geo_rows += upsert_detail(cur, as_of_date, file_id, "County", county_grouped)
+ provider_geo_rows += upsert_detail(cur, as_of_date, file_id, "Tract", tract_grouped)
+ conn.commit()
+
+ matched_rows += county_matches + tract_matches
+ if matched_rows and chunk_number % 10 == 0:
+ print(f" file_id={file_id}: chunk {chunk_number:,}, matched row-events={matched_rows:,}")
+
+ with conn.cursor() as cur:
+ cur.execute(
+ f"""
+ insert into {PROGRESS_TABLE} (
+ as_of_date, file_id, state_fips, technology_code,
+ technology_code_desc, record_count, matched_location_rows, provider_geo_rows
+ )
+ values (%s, %s, %s, %s, %s, %s, %s, %s)
+ on conflict (as_of_date, file_id) do update set
+ state_fips = excluded.state_fips,
+ technology_code = excluded.technology_code,
+ technology_code_desc = excluded.technology_code_desc,
+ record_count = excluded.record_count,
+ matched_location_rows = excluded.matched_location_rows,
+ provider_geo_rows = excluded.provider_geo_rows,
+ processed_at = now()
+ """,
+ (
+ as_of_date,
+ file_id,
+ file_row["state_fips"],
+ file_row["technology_code"],
+ file_row["technology_code_desc"],
+ file_row["record_count"],
+ matched_rows,
+ provider_geo_rows,
+ ),
+ )
+ conn.commit()
+ return matched_rows, provider_geo_rows
+ finally:
+ zip_path.unlink(missing_ok=True)
+
+
+def rebuild_aggregate(cur, as_of_date: date) -> int:
+ cur.execute(f"delete from {AGG_TABLE} where as_of_date = %s", (as_of_date,))
+ cur.execute(
+ f"""
+ insert into {AGG_TABLE} (
+ as_of_date, geography_type, geoid,
+ provider_count, fiber_provider_count, cable_provider_count,
+ fixed_wireless_provider_count, copper_provider_count,
+ provider_100_20_count, business_provider_count,
+ business_fiber_provider_count, business_100_20_provider_count,
+ max_advertised_download_mbps, max_advertised_upload_mbps,
+ matched_location_rows, provider_file_rows
+ )
+ with per_provider as (
+ select
+ as_of_date,
+ geography_type,
+ geoid,
+ provider_id,
+ bool_or(has_fiber) as has_fiber,
+ bool_or(has_cable) as has_cable,
+ bool_or(has_fixed_wireless) as has_fixed_wireless,
+ bool_or(has_copper) as has_copper,
+ bool_or(has_100_20) as has_100_20,
+ bool_or(has_business) as has_business,
+ bool_or(has_business_fiber) as has_business_fiber,
+ bool_or(has_business_100_20) as has_business_100_20,
+ max(max_advertised_download_mbps) as max_advertised_download_mbps,
+ max(max_advertised_upload_mbps) as max_advertised_upload_mbps,
+ sum(matched_location_rows) as matched_location_rows,
+ count(*) as provider_file_rows
+ from {DETAIL_TABLE}
+ where as_of_date = %s
+ group by 1, 2, 3, 4
+ )
+ select
+ as_of_date,
+ geography_type,
+ geoid,
+ count(*)::integer as provider_count,
+ count(*) filter (where has_fiber)::integer as fiber_provider_count,
+ count(*) filter (where has_cable)::integer as cable_provider_count,
+ count(*) filter (where has_fixed_wireless)::integer as fixed_wireless_provider_count,
+ count(*) filter (where has_copper)::integer as copper_provider_count,
+ count(*) filter (where has_100_20)::integer as provider_100_20_count,
+ count(*) filter (where has_business)::integer as business_provider_count,
+ count(*) filter (where has_business_fiber)::integer as business_fiber_provider_count,
+ count(*) filter (where has_business_100_20)::integer as business_100_20_provider_count,
+ max(max_advertised_download_mbps) as max_advertised_download_mbps,
+ max(max_advertised_upload_mbps) as max_advertised_upload_mbps,
+ sum(matched_location_rows)::bigint as matched_location_rows,
+ sum(provider_file_rows)::bigint as provider_file_rows
+ from per_provider
+ group by 1, 2, 3
+ """,
+ (as_of_date,),
+ )
+ return cur.rowcount
+
+
+def update_connection_table(cur, as_of_date: date) -> int:
+ cur.execute(
+ f"""
+ with joined as (
+ select
+ c.master_id,
+ coalesce(x.fcc_geoid, left(c.census_tract_geoid, 11)) as provider_tract_geoid,
+ coalesce(left(x.fcc_geoid, 5), left(c.census_tract_geoid, 5)) as provider_county_geoid,
+ county.geoid as county_geoid,
+ tract.geoid as tract_geoid,
+ county.provider_count as county_provider_count,
+ county.fiber_provider_count as county_fiber_provider_count,
+ county.cable_provider_count as county_cable_provider_count,
+ county.fixed_wireless_provider_count as county_fixed_wireless_provider_count,
+ county.provider_100_20_count as county_100_20_provider_count,
+ county.business_provider_count as county_business_provider_count,
+ county.business_fiber_provider_count as county_business_fiber_provider_count,
+ county.business_100_20_provider_count as county_business_100_20_provider_count,
+ county.max_advertised_download_mbps as county_max_down,
+ county.max_advertised_upload_mbps as county_max_up,
+ tract.provider_count as tract_provider_count,
+ tract.fiber_provider_count as tract_fiber_provider_count,
+ tract.cable_provider_count as tract_cable_provider_count,
+ tract.fixed_wireless_provider_count as tract_fixed_wireless_provider_count,
+ tract.provider_100_20_count as tract_100_20_provider_count,
+ tract.business_provider_count as tract_business_provider_count,
+ tract.business_fiber_provider_count as tract_business_fiber_provider_count,
+ tract.business_100_20_provider_count as tract_business_100_20_provider_count,
+ tract.max_advertised_download_mbps as tract_max_down,
+ tract.max_advertised_upload_mbps as tract_max_up
+ from {CONNECTION_TABLE} c
+ left join {CROSSWALK_TABLE} x
+ on x.source_geography_type = 'Tract'
+ and x.fcc_geography_type = 'Tract'
+ and x.source_geoid = c.census_tract_geoid
+ left join {AGG_TABLE} county
+ on county.as_of_date = %s
+ and county.geography_type = 'County'
+ and county.geoid = coalesce(left(x.fcc_geoid, 5), left(c.census_tract_geoid, 5))
+ left join {AGG_TABLE} tract
+ on tract.as_of_date = %s
+ and tract.geography_type = 'Tract'
+ and tract.geoid = coalesce(x.fcc_geoid, left(c.census_tract_geoid, 11))
+ )
+ update {CONNECTION_TABLE} c
+ set
+ fcc_provider_geography_type = case
+ when j.tract_geoid is not null then 'Tract'
+ when j.county_geoid is not null then 'County'
+ else c.fcc_provider_geography_type
+ end,
+ fcc_provider_geoid = coalesce(j.tract_geoid, j.county_geoid, c.fcc_provider_geoid),
+ fcc_provider_count = coalesce(j.tract_provider_count, j.county_provider_count),
+ fcc_fiber_provider_count = coalesce(j.tract_fiber_provider_count, j.county_fiber_provider_count),
+ fcc_cable_provider_count = coalesce(j.tract_cable_provider_count, j.county_cable_provider_count),
+ fcc_fixed_wireless_provider_count = coalesce(j.tract_fixed_wireless_provider_count, j.county_fixed_wireless_provider_count),
+ fcc_100_20_provider_count = coalesce(j.tract_100_20_provider_count, j.county_100_20_provider_count),
+ fcc_max_advertised_download_mbps = coalesce(j.tract_max_down, j.county_max_down, c.fcc_max_advertised_download_mbps),
+ fcc_max_advertised_upload_mbps = coalesce(j.tract_max_up, j.county_max_up, c.fcc_max_advertised_upload_mbps),
+ fcc_county_provider_count = j.county_provider_count,
+ fcc_county_fiber_provider_count = j.county_fiber_provider_count,
+ fcc_county_cable_provider_count = j.county_cable_provider_count,
+ fcc_county_fixed_wireless_provider_count = j.county_fixed_wireless_provider_count,
+ fcc_county_100_20_provider_count = j.county_100_20_provider_count,
+ fcc_county_business_provider_count = j.county_business_provider_count,
+ fcc_county_business_fiber_provider_count = j.county_business_fiber_provider_count,
+ fcc_county_business_100_20_provider_count = j.county_business_100_20_provider_count,
+ fcc_county_max_advertised_download_mbps = j.county_max_down,
+ fcc_county_max_advertised_upload_mbps = j.county_max_up,
+ fcc_tract_provider_count = j.tract_provider_count,
+ fcc_tract_fiber_provider_count = j.tract_fiber_provider_count,
+ fcc_tract_cable_provider_count = j.tract_cable_provider_count,
+ fcc_tract_fixed_wireless_provider_count = j.tract_fixed_wireless_provider_count,
+ fcc_tract_100_20_provider_count = j.tract_100_20_provider_count,
+ fcc_tract_business_provider_count = j.tract_business_provider_count,
+ fcc_tract_business_fiber_provider_count = j.tract_business_fiber_provider_count,
+ fcc_tract_business_100_20_provider_count = j.tract_business_100_20_provider_count,
+ fcc_tract_max_advertised_download_mbps = j.tract_max_down,
+ fcc_tract_max_advertised_upload_mbps = j.tract_max_up,
+ fcc_summary_json = jsonb_set(
+ coalesce(c.fcc_summary_json, '{{}}'::jsonb),
+ '{{location_provider_aggregate}}',
+ jsonb_build_object(
+ 'source', 'fcc_state_location_coverage',
+ 'as_of_date', %s::text,
+ 'preferred_geography_type', case
+ when j.tract_geoid is not null then 'Tract'
+ when j.county_geoid is not null then 'County'
+ else null
+ end,
+ 'preferred_geoid', coalesce(j.tract_geoid, j.county_geoid),
+ 'county_geoid', j.county_geoid,
+ 'tract_geoid', j.tract_geoid
+ ),
+ true
+ ),
+ fcc_bdc_status = case
+ when coalesce(j.tract_geoid, j.county_geoid) is not null then 'fcc_location_provider_joined'
+ else c.fcc_bdc_status
+ end,
+ updated_at = now()
+ from joined j
+ where c.master_id = j.master_id
+ and coalesce(j.tract_geoid, j.county_geoid) is not null
+ """,
+ (as_of_date, as_of_date, as_of_date),
+ )
+ return cur.rowcount
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--as-of-date", help="FCC availability as-of date; defaults to latest catalog date.")
+ parser.add_argument("--states", nargs="*", help="Optional state FIPS list, e.g. 11 34 51.")
+ parser.add_argument("--technology-codes", nargs="*", default=list(TERRESTRIAL_TECHNOLOGY_CODES))
+ parser.add_argument("--limit-files", type=int, help="Process only the first N matching files.")
+ parser.add_argument("--chunksize", type=int, default=500_000)
+ parser.add_argument("--refresh", action="store_true", help="Delete existing location-provider rows for this as-of date first.")
+ parser.add_argument("--no-resume", action="store_true", help="Reprocess files even if marked complete.")
+ parser.add_argument("--no-update-connection", action="store_true", help="Build aggregate tables but do not update data_center_broadband_connection.")
+ args = parser.parse_args()
+
+ load_zsh_secrets()
+ require_env(["PGWEB_HOST", "PGWEB_PORT", "PGWEB_USER", "PGWEB_PASSWORD"])
+
+ as_of_date = parse_date(args.as_of_date) if args.as_of_date else None
+ if as_of_date is None and args.as_of_date:
+ raise RuntimeError(f"Invalid --as-of-date: {args.as_of_date}")
+
+ technology_codes = normalize_codes(args.technology_codes)
+ requested_states = tuple(s.zfill(2) for s in args.states) if args.states else None
+
+ with get_conn() as conn:
+ with conn.cursor() as cur:
+ create_tables(cur)
+ seed_geoid_crosswalk(cur)
+ as_of_date = as_of_date or latest_catalog_date(cur)
+ states, counties, tracts = target_geographies(cur, requested_states)
+ if not states:
+ raise RuntimeError("No target data-center states found.")
+ if args.refresh:
+ cur.execute(f"delete from {DETAIL_TABLE} where as_of_date = %s", (as_of_date,))
+ cur.execute(f"delete from {AGG_TABLE} where as_of_date = %s", (as_of_date,))
+ cur.execute(f"delete from {PROGRESS_TABLE} where as_of_date = %s", (as_of_date,))
+ files = catalog_files(cur, as_of_date, states, technology_codes, args.limit_files)
+ conn.commit()
+
+ print(f"FCC as_of_date: {as_of_date}")
+ print(f"Target states: {len(states):,} | counties: {len(counties):,} | tracts: {len(tracts):,}")
+ print(f"Location coverage files selected: {len(files):,}")
+
+ total_matched_rows = 0
+ total_provider_geo_rows = 0
+ with tempfile.TemporaryDirectory(prefix="fcc_bdc_location_") as temp:
+ temp_dir = Path(temp)
+ for idx, file_row in enumerate(files, start=1):
+ file_id = file_row["file_id"]
+ with conn.cursor() as cur:
+ skip = (not args.no_resume) and progress_done(cur, as_of_date, file_id)
+ if skip:
+ print(f"[{idx:,}/{len(files):,}] skip file_id={file_id} already processed")
+ continue
+
+ print(
+ f"[{idx:,}/{len(files):,}] file_id={file_id} state={file_row['state_fips']} "
+ f"tech={file_row['technology_code']} records={file_row['record_count']:,}"
+ )
+ matched_rows, provider_geo_rows = process_file(
+ conn,
+ file_row,
+ as_of_date,
+ counties,
+ tracts,
+ args.chunksize,
+ temp_dir,
+ )
+ total_matched_rows += matched_rows
+ total_provider_geo_rows += provider_geo_rows
+ print(
+ f" complete file_id={file_id}: matched row-events={matched_rows:,}, "
+ f"provider-geography rows={provider_geo_rows:,}"
+ )
+
+ with conn.cursor() as cur:
+ agg_rows = rebuild_aggregate(cur, as_of_date)
+ updated_rows = 0
+ if not args.no_update_connection:
+ updated_rows = update_connection_table(cur, as_of_date)
+ conn.commit()
+
+ print(f"New matched row-events this run: {total_matched_rows:,}")
+ print(f"New provider-geography detail rows this run: {total_provider_geo_rows:,}")
+ print(f"{AGG_TABLE}: rebuilt {agg_rows:,} geography rows")
+ if not args.no_update_connection:
+ print(f"{CONNECTION_TABLE}: updated {updated_rows:,} rows with location-provider aggregates")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/enhanced_data_center_cluster_map.ipynb b/enhanced_data_center_cluster_map.ipynb
index 134d1b9..d6b63bc 100644
--- a/enhanced_data_center_cluster_map.ipynb
+++ b/enhanced_data_center_cluster_map.ipynb
@@ -96,6 +96,7 @@
"SHOW_ELECTION_LAYER = True\n",
"SHOW_ELECTION_2020_LAYER = True\n",
"SHOW_ELECTION_2024_LAYER = False\n",
+ "SHOW_NRI_LAYER = True\n",
"\n",
"OUTPUT_DIR.mkdir(exist_ok=True)\n",
"print('points:', POINTS_CSV)\n",
@@ -103,7 +104,7 @@
"print('point context:', POINT_CONTEXT_CSV)\n",
"print('HUC8 GeoJSON:', HUC8_GEOJSON)\n",
"print('state energy context:', STATE_ENERGY_CSV)\n",
- "print('html output:', MAP_HTML)\n"
+ "print('html output:', MAP_HTML)"
]
},
{
@@ -191,6 +192,7 @@
"climate_context = pd.DataFrame()\n",
"broadband_context = pd.DataFrame()\n",
"election_context = pd.DataFrame()\n",
+ "nri_context = pd.DataFrame()\n",
"\n",
"\n",
"def load_zsh_secrets() -> None:\n",
@@ -227,7 +229,7 @@
"\n",
"def load_optional_db_layers() -> None:\n",
" global internet_cables_geojson, opposition_cases, drought_context, smoke_context\n",
- " global climate_context, broadband_context, election_context, points\n",
+ " global climate_context, broadband_context, election_context, nri_context, points\n",
"\n",
" if not ENABLE_DB_LAYER_LOAD:\n",
" print('DB layer load disabled')\n",
@@ -392,8 +394,37 @@
" cols = [c for c in election_context.columns if c != 'master_id']\n",
" points = points.merge(election_context[['master_id'] + cols], on='master_id', how='left')\n",
"\n",
+ " if SHOW_NRI_LAYER:\n",
+ " # FEMA National Risk Index (December 2025). Per-DC values come from the\n",
+ " # census tract that contains the DC point. We pull composite scores plus\n",
+ " # the per-hazard risk score for the 18 NRI hazards.\n",
+ " nri_sql = \"\"\"\n",
+ " select\n",
+ " master_id, nri_status, \"TRACTFIPS\" as nri_tractfips,\n",
+ " \"RISK_SCORE\" as nri_risk_score, \"RISK_RATNG\" as nri_risk_rating,\n",
+ " \"EAL_SCORE\" as nri_eal_score, \"EAL_RATNG\" as nri_eal_rating,\n",
+ " \"EAL_VALT\" as nri_eal_total_usd,\n",
+ " \"SOVI_SCORE\" as nri_sovi_score, \"SOVI_RATNG\" as nri_sovi_rating,\n",
+ " \"RESL_SCORE\" as nri_resl_score, \"RESL_RATNG\" as nri_resl_rating,\n",
+ " \"AVLN_RISKS\" as nri_avln_risk, \"CFLD_RISKS\" as nri_cfld_risk,\n",
+ " \"CWAV_RISKS\" as nri_cwav_risk, \"DRGT_RISKS\" as nri_drgt_risk,\n",
+ " \"ERQK_RISKS\" as nri_erqk_risk, \"HAIL_RISKS\" as nri_hail_risk,\n",
+ " \"HWAV_RISKS\" as nri_hwav_risk, \"HRCN_RISKS\" as nri_hrcn_risk,\n",
+ " \"ISTM_RISKS\" as nri_istm_risk, \"LNDS_RISKS\" as nri_lnds_risk,\n",
+ " \"LTNG_RISKS\" as nri_ltng_risk, \"IFLD_RISKS\" as nri_ifld_risk,\n",
+ " \"SWND_RISKS\" as nri_swnd_risk, \"TRND_RISKS\" as nri_trnd_risk,\n",
+ " \"TSUN_RISKS\" as nri_tsun_risk, \"VLCN_RISKS\" as nri_vlcn_risk,\n",
+ " \"WFIR_RISKS\" as nri_wfir_risk, \"WNTW_RISKS\" as nri_wntw_risk\n",
+ " from public.data_center_nri_exposure\n",
+ " \"\"\"\n",
+ " nri_context = pd.read_sql(nri_sql, conn)\n",
+ " print(f'nri_context rows: {len(nri_context):,}')\n",
+ " if not nri_context.empty:\n",
+ " cols = [c for c in nri_context.columns if c != 'master_id']\n",
+ " points = points.merge(nri_context[['master_id'] + cols], on='master_id', how='left')\n",
"\n",
- "load_optional_db_layers()\n"
+ "\n",
+ "load_optional_db_layers()"
]
},
{
@@ -408,6 +439,10 @@
"- `public.opposition_cases_geocoded` (point layer)\n",
"- `public.data_center_usdm_drought_exposure` (point popup enrichment)\n",
"- `public.data_center_hms_smoke_exposure` (point popup enrichment)\n",
+ "- `public.data_center_historical_climate` (climate stress layer + popup)\n",
+ "- `public.data_center_broadband_connection` (broadband capacity layer + popup)\n",
+ "- `public.data_center_rdh_precinct_vote_matches` (election context layer + popup)\n",
+ "- `public.data_center_nri_exposure` (FEMA NRI multi-hazard risk layer + popup)\n",
"\n",
"If DB credentials are unavailable, map generation still works with CSV/GeoJSON sources."
]
@@ -437,6 +472,19 @@
"INTERNET_CABLE_COLOR = '#7c3aed'\n",
"OPPOSITION_CASE_COLOR = '#b91c1c'\n",
"\n",
+ "# NRI hazard prefix -> human-readable label, used in the per-DC popup.\n",
+ "NRI_HAZARDS = [\n",
+ " ('avln', 'Avalanche'), ('cfld', 'Coastal flood'),\n",
+ " ('cwav', 'Cold wave'), ('drgt', 'Drought'),\n",
+ " ('erqk', 'Earthquake'), ('hail', 'Hail'),\n",
+ " ('hwav', 'Heat wave'), ('hrcn', 'Hurricane'),\n",
+ " ('istm', 'Ice storm'), ('lnds', 'Landslide'),\n",
+ " ('ltng', 'Lightning'), ('ifld', 'Inland flood'),\n",
+ " ('swnd', 'Strong wind'), ('trnd', 'Tornado'),\n",
+ " ('tsun', 'Tsunami'), ('vlcn', 'Volcanic activity'),\n",
+ " ('wfir', 'Wildfire'), ('wntw', 'Winter weather'),\n",
+ "]\n",
+ "\n",
"cluster_info = clusters.set_index('cluster_id').to_dict('index')\n",
"\n",
"\n",
@@ -517,6 +565,34 @@
" return '#6b7280'\n",
"\n",
"\n",
+ "def nri_color(risk_score):\n",
+ " \"\"\"FEMA NRI composite RISK_SCORE color ramp (0-100, higher = more risk).\"\"\"\n",
+ " if pd.isna(risk_score):\n",
+ " return '#94a3b8'\n",
+ " r = float(risk_score)\n",
+ " if r >= 80:\n",
+ " return '#7f1d1d'\n",
+ " if r >= 60:\n",
+ " return '#dc2626'\n",
+ " if r >= 40:\n",
+ " return '#ea580c'\n",
+ " if r >= 20:\n",
+ " return '#ca8a04'\n",
+ " return '#0284c7'\n",
+ "\n",
+ "\n",
+ "def top_nri_hazards(row, n=3):\n",
+ " \"\"\"Return the top-N hazards by risk score for this DC, as 'Label: score' strings.\"\"\"\n",
+ " pairs = []\n",
+ " for prefix, label in NRI_HAZARDS:\n",
+ " attr = f'nri_{prefix}_risk'\n",
+ " val = getattr(row, attr, None)\n",
+ " if val is not None and pd.notna(val):\n",
+ " pairs.append((label, float(val)))\n",
+ " pairs.sort(key=lambda p: p[1], reverse=True)\n",
+ " return [f'{label}: {score:.1f}' for label, score in pairs[:n]]\n",
+ "\n",
+ "\n",
"def point_popup(row):\n",
" cluster_label, cluster_size, cluster_rank = cluster_label_and_size(row.cluster_id)\n",
" nearest = row.nearest_neighbor_km\n",
@@ -643,6 +719,24 @@
" {election_2020_lines}\n",
" '''\n",
"\n",
+ " nri_lines = ''\n",
+ " if hasattr(row, 'nri_status') and pd.notna(row.nri_status):\n",
+ " top3 = top_nri_hazards(row, n=3)\n",
+ " top3_html = ('
'.join(top3)) if top3 else 'n/a'\n",
+ " nri_lines = f'''\n",
+ "