Files
data-centers/scripts/build_fcc_bdc_broadband_connection_table.py
dadams ee5856661a Reorganize project into scripts/, docs/, data/, output/ directories
Move all Python scripts to scripts/, documentation to docs/, raw input
data to data/, and generated HTML/CSV outputs to output/. Update path
references in 8 scripts to use Path(__file__).parent.parent as project
root so they work correctly from the new location. Update README links
and quick-start commands accordingly. Notebooks remain at root.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 21:57:47 -07:00

398 lines
12 KiB
Python

#!/usr/bin/env python3
"""Build data-center broadband connection tables.
Creates a per-data-center broadband connection table and, when FCC BDC API
credentials are available, stores the FCC BDC public download catalog.
Required DB env vars:
PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD
FCC API env vars:
FCC_USERNAME or FCC_BDC_USERNAME - FCC User Registration username/email
FCC_API_KEY or FCC_HASH_VALUE - BDC public API hash_value token
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from datetime import date, datetime
from pathlib import Path
from typing import Any
import psycopg2
import requests
from psycopg2.extras import Json, execute_values
DB_NAME = "data_centers"
MASTER_TABLE = "public.master_data_centers"
TRACT_TABLE = "public.data_center_census_tracts_2024"
AS_OF_TABLE = "public.fcc_bdc_api_as_of_dates"
FILES_TABLE = "public.fcc_bdc_availability_files"
CONNECTION_TABLE = "public.data_center_broadband_connection"
FCC_BASE_URL = "https://broadbandmap.fcc.gov/api/public"
USER_AGENT = "data-center-fcc-bdc-loader/1.0"
def load_zsh_secrets() -> None:
"""Load shell secrets into this process without printing values."""
secrets = Path.home() / ".zsh_secrets"
if not secrets.exists():
return
result = subprocess.run(
["zsh", "-lc", "source ~/.zsh_secrets >/dev/null 2>&1; env"],
check=True,
capture_output=True,
text=True,
)
for line in result.stdout.splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
if key and key not in os.environ:
os.environ[key] = value
def require_env(keys: list[str]) -> None:
missing = [k for k in keys if not os.getenv(k)]
if missing:
raise RuntimeError("Missing required env vars: " + ", ".join(missing))
def get_conn():
return psycopg2.connect(
host=os.environ["PGWEB_HOST"],
port=os.environ["PGWEB_PORT"],
user=os.environ["PGWEB_USER"],
password=os.environ["PGWEB_PASSWORD"],
dbname="data_centers",
)
def fcc_credentials() -> tuple[str | None, str | None]:
username = os.getenv("FCC_USERNAME") or os.getenv("FCC_BDC_USERNAME")
hash_value = os.getenv("FCC_API_KEY") or os.getenv("FCC_HASH_VALUE")
return username, hash_value
def fcc_get(path: str, *, params: dict[str, Any] | None = None) -> dict[str, Any]:
username, hash_value = fcc_credentials()
if not username or not hash_value:
raise RuntimeError(
"FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus "
"FCC_API_KEY or FCC_HASH_VALUE."
)
url = f"{FCC_BASE_URL}{path}"
headers = {
"username": username,
"hash_value": hash_value,
"user-agent": USER_AGENT,
"accept": "application/json",
}
response = requests.get(url, headers=headers, params=params or {}, timeout=60)
response.raise_for_status()
payload = response.json()
if str(payload.get("status_code")) in {"401", "403"} or payload.get("status") == "fail":
raise RuntimeError(f"FCC API error for {path}: {payload}")
return payload
def parse_date(value: Any) -> date | None:
if value in (None, ""):
return None
if isinstance(value, date):
return value
return datetime.strptime(str(value)[:10], "%Y-%m-%d").date()
def to_int(value: Any) -> int | None:
if value in (None, ""):
return None
try:
return int(str(value).replace(",", ""))
except (TypeError, ValueError):
return None
def create_tables(cur) -> None:
cur.execute("create extension if not exists postgis")
cur.execute(
f"""
create table if not exists {AS_OF_TABLE} (
data_type text not null,
as_of_date date not null,
raw jsonb not null,
fetched_at timestamptz not null default now(),
primary key (data_type, as_of_date)
)
"""
)
cur.execute(
f"""
create table if not exists {FILES_TABLE} (
as_of_date date not null,
file_id bigint not null,
category text,
subcategory text,
technology_type text,
technology_code text,
technology_code_desc text,
speed_tier text,
state_fips text,
state_name text,
provider_id bigint,
provider_name text,
file_type text,
file_name text,
record_count bigint,
raw jsonb not null,
fetched_at timestamptz not null default now(),
primary key (as_of_date, file_id)
)
"""
)
cur.execute(
f"create index if not exists fcc_bdc_availability_files_category_idx "
f"on {FILES_TABLE} (category, subcategory)"
)
cur.execute(
f"create index if not exists fcc_bdc_availability_files_state_idx "
f"on {FILES_TABLE} (state_fips)"
)
cur.execute(
f"create index if not exists fcc_bdc_availability_files_provider_idx "
f"on {FILES_TABLE} (provider_id)"
)
cur.execute(
f"""
create table if not exists {CONNECTION_TABLE} (
master_id text primary key references public.master_data_centers(master_id) on delete cascade,
source text,
name text,
operator text,
city text,
state text,
country text,
longitude double precision,
latitude double precision,
geom geometry(Point, 4326),
census_tract_geoid text,
census_broadband_subscription_pct numeric,
fcc_bdc_status text not null,
fcc_bdc_as_of_date date,
fcc_bdc_geography_type text,
fcc_bdc_geoid text,
fcc_provider_count integer,
fcc_fiber_provider_count integer,
fcc_cable_provider_count integer,
fcc_fixed_wireless_provider_count integer,
fcc_max_advertised_download_mbps numeric,
fcc_max_advertised_upload_mbps numeric,
fcc_100_20_provider_count integer,
fcc_summary_json jsonb,
fetched_at timestamptz not null default now(),
updated_at timestamptz not null default now()
)
"""
)
cur.execute(
f"create index if not exists data_center_broadband_connection_geom_gix "
f"on {CONNECTION_TABLE} using gist (geom)"
)
cur.execute(
f"create index if not exists data_center_broadband_connection_tract_idx "
f"on {CONNECTION_TABLE} (census_tract_geoid)"
)
cur.execute(
f"create index if not exists data_center_broadband_connection_status_idx "
f"on {CONNECTION_TABLE} (fcc_bdc_status)"
)
def rebuild_connection_base(cur, status: str) -> int:
cur.execute(f"truncate {CONNECTION_TABLE}")
cur.execute(
f"""
insert into {CONNECTION_TABLE} (
master_id, source, name, operator, city, state, country,
longitude, latitude, geom,
census_tract_geoid, census_broadband_subscription_pct,
fcc_bdc_status
)
select
dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,
dc.longitude, dc.latitude, dc.geom,
dc.geoid as census_tract_geoid,
tr.broadband_subscription_pct as census_broadband_subscription_pct,
%s as fcc_bdc_status
from {MASTER_TABLE} dc
left join {TRACT_TABLE} tr on tr.geoid::text = dc.geoid::text
"""
,
(status,),
)
cur.execute(f"select count(*) from {CONNECTION_TABLE}")
return cur.fetchone()[0]
def latest_availability_date(rows: list[dict[str, Any]]) -> date | None:
dates = [
parse_date(r.get("as_of_date"))
for r in rows
if str(r.get("data_type", "")).lower() in {"availability", "availability data"}
]
dates = [d for d in dates if d is not None]
return max(dates) if dates else None
def load_as_of_dates(cur) -> date:
payload = fcc_get("/map/listAsOfDates")
rows = payload.get("data") or []
values = []
for row in rows:
as_of_date = parse_date(row.get("as_of_date"))
if not as_of_date:
continue
values.append((row.get("data_type"), as_of_date, Json(row)))
if values:
execute_values(
cur,
f"""
insert into {AS_OF_TABLE} (data_type, as_of_date, raw)
values %s
on conflict (data_type, as_of_date) do update set
raw = excluded.raw,
fetched_at = now()
""",
values,
page_size=1000,
)
latest = latest_availability_date(rows)
if latest is None:
raise RuntimeError(f"Could not find an availability as_of_date in FCC response: {rows}")
return latest
def load_availability_file_catalog(cur, as_of_date: date) -> int:
payload = fcc_get(
f"/map/downloads/listAvailabilityData/{as_of_date:%Y-%m-%d}",
params={"technology_type": "Fixed Broadband"},
)
rows = payload.get("data") or []
values = []
for row in rows:
file_id = to_int(row.get("file_id"))
if file_id is None:
continue
values.append(
(
as_of_date,
file_id,
row.get("category"),
row.get("subcategory"),
row.get("technology_type"),
row.get("technology_code"),
row.get("technology_code_desc"),
row.get("speed_tier"),
row.get("state_fips"),
row.get("state_name"),
to_int(row.get("provider_id")),
row.get("provider_name"),
row.get("file_type"),
row.get("file_name"),
to_int(row.get("record_count")),
Json(row),
)
)
if values:
cur.execute(f"delete from {FILES_TABLE} where as_of_date = %s", (as_of_date,))
execute_values(
cur,
f"""
insert into {FILES_TABLE} (
as_of_date, file_id, category, subcategory, technology_type,
technology_code, technology_code_desc, speed_tier, state_fips,
state_name, provider_id, provider_name, file_type, file_name,
record_count, raw
)
values %s
""",
values,
page_size=1000,
)
return len(values)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--skip-fcc", action="store_true", help="Only create/rebuild the base connection table.")
parser.add_argument("--as-of-date", help="FCC BDC availability as-of date, YYYY-MM-DD. Defaults to latest.")
args = parser.parse_args()
load_zsh_secrets()
require_env(["PGWEB_HOST", "PGWEB_PORT", "PGWEB_USER", "PGWEB_PASSWORD"])
username, hash_value = fcc_credentials()
status = "pending_fcc_username" if hash_value and not username else "pending_fcc_catalog"
if args.skip_fcc:
status = "fcc_skipped"
with get_conn() as conn:
with conn.cursor() as cur:
create_tables(cur)
n_connection = rebuild_connection_base(cur, status)
print(f"{CONNECTION_TABLE}: {n_connection:,} base rows")
if args.skip_fcc:
conn.commit()
return 0
if not username or not hash_value:
print(
"FCC catalog not loaded: set FCC_USERNAME or FCC_BDC_USERNAME "
"alongside FCC_API_KEY/FCC_HASH_VALUE in ~/.zsh_secrets.",
file=sys.stderr,
)
conn.commit()
return 2
as_of_date = parse_date(args.as_of_date) if args.as_of_date else load_as_of_dates(cur)
n_files = load_availability_file_catalog(cur, as_of_date)
cur.execute(
f"""
update {CONNECTION_TABLE}
set fcc_bdc_status = 'fcc_catalog_loaded',
fcc_bdc_as_of_date = %s,
updated_at = now()
""",
(as_of_date,),
)
conn.commit()
print(f"{AS_OF_TABLE}: loaded latest availability date {as_of_date}")
print(f"{FILES_TABLE}: {n_files:,} fixed-broadband file catalog rows")
return 0
if __name__ == "__main__":
raise SystemExit(main())