Move all Python scripts to scripts/, documentation to docs/, raw input data to data/, and generated HTML/CSV outputs to output/. Update path references in 8 scripts to use Path(__file__).parent.parent as project root so they work correctly from the new location. Update README links and quick-start commands accordingly. Notebooks remain at root. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
398 lines
12 KiB
Python
398 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Build data-center broadband connection tables.
|
|
|
|
Creates a per-data-center broadband connection table and, when FCC BDC API
|
|
credentials are available, stores the FCC BDC public download catalog.
|
|
|
|
Required DB env vars:
|
|
PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD
|
|
|
|
FCC API env vars:
|
|
FCC_USERNAME or FCC_BDC_USERNAME - FCC User Registration username/email
|
|
FCC_API_KEY or FCC_HASH_VALUE - BDC public API hash_value token
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import psycopg2
|
|
import requests
|
|
from psycopg2.extras import Json, execute_values
|
|
|
|
|
|
DB_NAME = "data_centers"
|
|
|
|
MASTER_TABLE = "public.master_data_centers"
|
|
TRACT_TABLE = "public.data_center_census_tracts_2024"
|
|
AS_OF_TABLE = "public.fcc_bdc_api_as_of_dates"
|
|
FILES_TABLE = "public.fcc_bdc_availability_files"
|
|
CONNECTION_TABLE = "public.data_center_broadband_connection"
|
|
|
|
FCC_BASE_URL = "https://broadbandmap.fcc.gov/api/public"
|
|
USER_AGENT = "data-center-fcc-bdc-loader/1.0"
|
|
|
|
|
|
def load_zsh_secrets() -> None:
|
|
"""Load shell secrets into this process without printing values."""
|
|
secrets = Path.home() / ".zsh_secrets"
|
|
if not secrets.exists():
|
|
return
|
|
|
|
result = subprocess.run(
|
|
["zsh", "-lc", "source ~/.zsh_secrets >/dev/null 2>&1; env"],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
for line in result.stdout.splitlines():
|
|
if "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def require_env(keys: list[str]) -> None:
|
|
missing = [k for k in keys if not os.getenv(k)]
|
|
if missing:
|
|
raise RuntimeError("Missing required env vars: " + ", ".join(missing))
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(
|
|
host=os.environ["PGWEB_HOST"],
|
|
port=os.environ["PGWEB_PORT"],
|
|
user=os.environ["PGWEB_USER"],
|
|
password=os.environ["PGWEB_PASSWORD"],
|
|
dbname="data_centers",
|
|
)
|
|
|
|
|
|
def fcc_credentials() -> tuple[str | None, str | None]:
|
|
username = os.getenv("FCC_USERNAME") or os.getenv("FCC_BDC_USERNAME")
|
|
hash_value = os.getenv("FCC_API_KEY") or os.getenv("FCC_HASH_VALUE")
|
|
return username, hash_value
|
|
|
|
|
|
def fcc_get(path: str, *, params: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
username, hash_value = fcc_credentials()
|
|
if not username or not hash_value:
|
|
raise RuntimeError(
|
|
"FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus "
|
|
"FCC_API_KEY or FCC_HASH_VALUE."
|
|
)
|
|
|
|
url = f"{FCC_BASE_URL}{path}"
|
|
headers = {
|
|
"username": username,
|
|
"hash_value": hash_value,
|
|
"user-agent": USER_AGENT,
|
|
"accept": "application/json",
|
|
}
|
|
response = requests.get(url, headers=headers, params=params or {}, timeout=60)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if str(payload.get("status_code")) in {"401", "403"} or payload.get("status") == "fail":
|
|
raise RuntimeError(f"FCC API error for {path}: {payload}")
|
|
return payload
|
|
|
|
|
|
def parse_date(value: Any) -> date | None:
|
|
if value in (None, ""):
|
|
return None
|
|
if isinstance(value, date):
|
|
return value
|
|
return datetime.strptime(str(value)[:10], "%Y-%m-%d").date()
|
|
|
|
|
|
def to_int(value: Any) -> int | None:
|
|
if value in (None, ""):
|
|
return None
|
|
try:
|
|
return int(str(value).replace(",", ""))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def create_tables(cur) -> None:
|
|
cur.execute("create extension if not exists postgis")
|
|
|
|
cur.execute(
|
|
f"""
|
|
create table if not exists {AS_OF_TABLE} (
|
|
data_type text not null,
|
|
as_of_date date not null,
|
|
raw jsonb not null,
|
|
fetched_at timestamptz not null default now(),
|
|
primary key (data_type, as_of_date)
|
|
)
|
|
"""
|
|
)
|
|
|
|
cur.execute(
|
|
f"""
|
|
create table if not exists {FILES_TABLE} (
|
|
as_of_date date not null,
|
|
file_id bigint not null,
|
|
category text,
|
|
subcategory text,
|
|
technology_type text,
|
|
technology_code text,
|
|
technology_code_desc text,
|
|
speed_tier text,
|
|
state_fips text,
|
|
state_name text,
|
|
provider_id bigint,
|
|
provider_name text,
|
|
file_type text,
|
|
file_name text,
|
|
record_count bigint,
|
|
raw jsonb not null,
|
|
fetched_at timestamptz not null default now(),
|
|
primary key (as_of_date, file_id)
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists fcc_bdc_availability_files_category_idx "
|
|
f"on {FILES_TABLE} (category, subcategory)"
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists fcc_bdc_availability_files_state_idx "
|
|
f"on {FILES_TABLE} (state_fips)"
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists fcc_bdc_availability_files_provider_idx "
|
|
f"on {FILES_TABLE} (provider_id)"
|
|
)
|
|
|
|
cur.execute(
|
|
f"""
|
|
create table if not exists {CONNECTION_TABLE} (
|
|
master_id text primary key references public.master_data_centers(master_id) on delete cascade,
|
|
source text,
|
|
name text,
|
|
operator text,
|
|
city text,
|
|
state text,
|
|
country text,
|
|
longitude double precision,
|
|
latitude double precision,
|
|
geom geometry(Point, 4326),
|
|
|
|
census_tract_geoid text,
|
|
census_broadband_subscription_pct numeric,
|
|
|
|
fcc_bdc_status text not null,
|
|
fcc_bdc_as_of_date date,
|
|
fcc_bdc_geography_type text,
|
|
fcc_bdc_geoid text,
|
|
|
|
fcc_provider_count integer,
|
|
fcc_fiber_provider_count integer,
|
|
fcc_cable_provider_count integer,
|
|
fcc_fixed_wireless_provider_count integer,
|
|
fcc_max_advertised_download_mbps numeric,
|
|
fcc_max_advertised_upload_mbps numeric,
|
|
fcc_100_20_provider_count integer,
|
|
fcc_summary_json jsonb,
|
|
|
|
fetched_at timestamptz not null default now(),
|
|
updated_at timestamptz not null default now()
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists data_center_broadband_connection_geom_gix "
|
|
f"on {CONNECTION_TABLE} using gist (geom)"
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists data_center_broadband_connection_tract_idx "
|
|
f"on {CONNECTION_TABLE} (census_tract_geoid)"
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists data_center_broadband_connection_status_idx "
|
|
f"on {CONNECTION_TABLE} (fcc_bdc_status)"
|
|
)
|
|
|
|
|
|
def rebuild_connection_base(cur, status: str) -> int:
|
|
cur.execute(f"truncate {CONNECTION_TABLE}")
|
|
cur.execute(
|
|
f"""
|
|
insert into {CONNECTION_TABLE} (
|
|
master_id, source, name, operator, city, state, country,
|
|
longitude, latitude, geom,
|
|
census_tract_geoid, census_broadband_subscription_pct,
|
|
fcc_bdc_status
|
|
)
|
|
select
|
|
dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,
|
|
dc.longitude, dc.latitude, dc.geom,
|
|
dc.geoid as census_tract_geoid,
|
|
tr.broadband_subscription_pct as census_broadband_subscription_pct,
|
|
%s as fcc_bdc_status
|
|
from {MASTER_TABLE} dc
|
|
left join {TRACT_TABLE} tr on tr.geoid::text = dc.geoid::text
|
|
"""
|
|
,
|
|
(status,),
|
|
)
|
|
cur.execute(f"select count(*) from {CONNECTION_TABLE}")
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def latest_availability_date(rows: list[dict[str, Any]]) -> date | None:
|
|
dates = [
|
|
parse_date(r.get("as_of_date"))
|
|
for r in rows
|
|
if str(r.get("data_type", "")).lower() in {"availability", "availability data"}
|
|
]
|
|
dates = [d for d in dates if d is not None]
|
|
return max(dates) if dates else None
|
|
|
|
|
|
def load_as_of_dates(cur) -> date:
|
|
payload = fcc_get("/map/listAsOfDates")
|
|
rows = payload.get("data") or []
|
|
values = []
|
|
for row in rows:
|
|
as_of_date = parse_date(row.get("as_of_date"))
|
|
if not as_of_date:
|
|
continue
|
|
values.append((row.get("data_type"), as_of_date, Json(row)))
|
|
|
|
if values:
|
|
execute_values(
|
|
cur,
|
|
f"""
|
|
insert into {AS_OF_TABLE} (data_type, as_of_date, raw)
|
|
values %s
|
|
on conflict (data_type, as_of_date) do update set
|
|
raw = excluded.raw,
|
|
fetched_at = now()
|
|
""",
|
|
values,
|
|
page_size=1000,
|
|
)
|
|
|
|
latest = latest_availability_date(rows)
|
|
if latest is None:
|
|
raise RuntimeError(f"Could not find an availability as_of_date in FCC response: {rows}")
|
|
return latest
|
|
|
|
|
|
def load_availability_file_catalog(cur, as_of_date: date) -> int:
|
|
payload = fcc_get(
|
|
f"/map/downloads/listAvailabilityData/{as_of_date:%Y-%m-%d}",
|
|
params={"technology_type": "Fixed Broadband"},
|
|
)
|
|
rows = payload.get("data") or []
|
|
values = []
|
|
for row in rows:
|
|
file_id = to_int(row.get("file_id"))
|
|
if file_id is None:
|
|
continue
|
|
values.append(
|
|
(
|
|
as_of_date,
|
|
file_id,
|
|
row.get("category"),
|
|
row.get("subcategory"),
|
|
row.get("technology_type"),
|
|
row.get("technology_code"),
|
|
row.get("technology_code_desc"),
|
|
row.get("speed_tier"),
|
|
row.get("state_fips"),
|
|
row.get("state_name"),
|
|
to_int(row.get("provider_id")),
|
|
row.get("provider_name"),
|
|
row.get("file_type"),
|
|
row.get("file_name"),
|
|
to_int(row.get("record_count")),
|
|
Json(row),
|
|
)
|
|
)
|
|
|
|
if values:
|
|
cur.execute(f"delete from {FILES_TABLE} where as_of_date = %s", (as_of_date,))
|
|
execute_values(
|
|
cur,
|
|
f"""
|
|
insert into {FILES_TABLE} (
|
|
as_of_date, file_id, category, subcategory, technology_type,
|
|
technology_code, technology_code_desc, speed_tier, state_fips,
|
|
state_name, provider_id, provider_name, file_type, file_name,
|
|
record_count, raw
|
|
)
|
|
values %s
|
|
""",
|
|
values,
|
|
page_size=1000,
|
|
)
|
|
return len(values)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--skip-fcc", action="store_true", help="Only create/rebuild the base connection table.")
|
|
parser.add_argument("--as-of-date", help="FCC BDC availability as-of date, YYYY-MM-DD. Defaults to latest.")
|
|
args = parser.parse_args()
|
|
|
|
load_zsh_secrets()
|
|
require_env(["PGWEB_HOST", "PGWEB_PORT", "PGWEB_USER", "PGWEB_PASSWORD"])
|
|
|
|
username, hash_value = fcc_credentials()
|
|
status = "pending_fcc_username" if hash_value and not username else "pending_fcc_catalog"
|
|
if args.skip_fcc:
|
|
status = "fcc_skipped"
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
create_tables(cur)
|
|
n_connection = rebuild_connection_base(cur, status)
|
|
print(f"{CONNECTION_TABLE}: {n_connection:,} base rows")
|
|
|
|
if args.skip_fcc:
|
|
conn.commit()
|
|
return 0
|
|
|
|
if not username or not hash_value:
|
|
print(
|
|
"FCC catalog not loaded: set FCC_USERNAME or FCC_BDC_USERNAME "
|
|
"alongside FCC_API_KEY/FCC_HASH_VALUE in ~/.zsh_secrets.",
|
|
file=sys.stderr,
|
|
)
|
|
conn.commit()
|
|
return 2
|
|
|
|
as_of_date = parse_date(args.as_of_date) if args.as_of_date else load_as_of_dates(cur)
|
|
n_files = load_availability_file_catalog(cur, as_of_date)
|
|
|
|
cur.execute(
|
|
f"""
|
|
update {CONNECTION_TABLE}
|
|
set fcc_bdc_status = 'fcc_catalog_loaded',
|
|
fcc_bdc_as_of_date = %s,
|
|
updated_at = now()
|
|
""",
|
|
(as_of_date,),
|
|
)
|
|
conn.commit()
|
|
|
|
print(f"{AS_OF_TABLE}: loaded latest availability date {as_of_date}")
|
|
print(f"{FILES_TABLE}: {n_files:,} fixed-broadband file catalog rows")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|