1535 lines
63 KiB
Plaintext
1535 lines
63 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "0",
|
|
"metadata": {},
|
|
"source": [
|
|
"# FCC BDC Broadband Connection Tables for Master Data Centers\n",
|
|
"\n",
|
|
"Notebook version of [build_fcc_bdc_broadband_connection_table.py](build_fcc_bdc_broadband_connection_table.py).\n",
|
|
"\n",
|
|
"Builds and refreshes:\n",
|
|
"\n",
|
|
"1. `public.fcc_bdc_api_as_of_dates` - FCC BDC API as-of date catalog\n",
|
|
"2. `public.fcc_bdc_availability_files` - FCC fixed-broadband availability file catalog for an as-of date\n",
|
|
"3. `public.data_center_broadband_connection` - per-data-center broadband connection base table\n",
|
|
"\n",
|
|
"If FCC credentials are missing, the notebook still rebuilds the base connection table and leaves FCC status as pending."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from __future__ import annotations\n",
|
|
"\n",
|
|
"import os\n",
|
|
"import subprocess\n",
|
|
"from datetime import date, datetime\n",
|
|
"from pathlib import Path\n",
|
|
"from typing import Any\n",
|
|
"\n",
|
|
"import pandas as pd\n",
|
|
"import psycopg2\n",
|
|
"import requests\n",
|
|
"from psycopg2.extras import Json, execute_values\n",
|
|
"\n",
|
|
"pd.set_option('display.max_columns', 200)\n",
|
|
"\n",
|
|
"print('pandas: ', pd.__version__)\n",
|
|
"print('psycopg2:', psycopg2.__version__)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "2",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def load_env_file(env_path: str = '.env') -> None:\n",
|
|
" p = Path(env_path)\n",
|
|
" if not p.exists():\n",
|
|
" print(f'No {env_path} file found in {Path.cwd()}')\n",
|
|
" return\n",
|
|
"\n",
|
|
" loaded = 0\n",
|
|
" for raw_line in p.read_text(encoding='utf-8').splitlines():\n",
|
|
" line = raw_line.strip()\n",
|
|
" if not line or line.startswith('#') or '=' not in line:\n",
|
|
" continue\n",
|
|
" key, value = line.split('=', 1)\n",
|
|
" key = key.strip()\n",
|
|
" value = value.strip().strip('\\\"').strip(\"'\")\n",
|
|
" if key and key not in os.environ:\n",
|
|
" os.environ[key] = value\n",
|
|
" loaded += 1\n",
|
|
" print(f'Loaded {loaded} env var(s) from {env_path}')\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_zsh_secrets() -> None:\n",
|
|
" secrets = Path.home() / '.zsh_secrets'\n",
|
|
" if not secrets.exists():\n",
|
|
" return\n",
|
|
"\n",
|
|
" result = subprocess.run(\n",
|
|
" ['zsh', '-lc', 'source ~/.zsh_secrets >/dev/null 2>&1; env'],\n",
|
|
" check=True,\n",
|
|
" capture_output=True,\n",
|
|
" text=True,\n",
|
|
" )\n",
|
|
" for line in result.stdout.splitlines():\n",
|
|
" if '=' not in line:\n",
|
|
" continue\n",
|
|
" key, value = line.split('=', 1)\n",
|
|
" if key and key not in os.environ:\n",
|
|
" os.environ[key] = value\n",
|
|
"\n",
|
|
"\n",
|
|
"def require_env(keys: list[str]) -> None:\n",
|
|
" missing = [k for k in keys if not os.getenv(k)]\n",
|
|
" if missing:\n",
|
|
" raise RuntimeError('Missing required env vars: ' + ', '.join(missing))\n",
|
|
"\n",
|
|
"\n",
|
|
"load_env_file('.env')\n",
|
|
"load_zsh_secrets()\n",
|
|
"require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "3",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"DB_NAME = 'data_centers'\n",
|
|
"\n",
|
|
"MASTER_TABLE = 'public.master_data_centers'\n",
|
|
"TRACT_TABLE = 'public.data_center_census_tracts_2024'\n",
|
|
"AS_OF_TABLE = 'public.fcc_bdc_api_as_of_dates'\n",
|
|
"FILES_TABLE = 'public.fcc_bdc_availability_files'\n",
|
|
"CONNECTION_TABLE = 'public.data_center_broadband_connection'\n",
|
|
"\n",
|
|
"FCC_BASE_URL = 'https://broadbandmap.fcc.gov/api/public'\n",
|
|
"USER_AGENT = 'data-center-fcc-bdc-loader/1.0'\n",
|
|
"\n",
|
|
"\n",
|
|
"def get_conn():\n",
|
|
" return psycopg2.connect(\n",
|
|
" host=os.environ['PGWEB_HOST'],\n",
|
|
" port=os.environ['PGWEB_PORT'],\n",
|
|
" user=os.environ['PGWEB_USER'],\n",
|
|
" password=os.environ['PGWEB_PASSWORD'],\n",
|
|
" dbname=DB_NAME,\n",
|
|
" )\n",
|
|
"\n",
|
|
"\n",
|
|
"with get_conn() as conn:\n",
|
|
" with conn.cursor() as cur:\n",
|
|
" cur.execute('select current_database(), current_user')\n",
|
|
" print('Connected:', cur.fetchone())\n",
|
|
" cur.execute('create extension if not exists postgis')\n",
|
|
" for t in (MASTER_TABLE, TRACT_TABLE):\n",
|
|
" cur.execute('select to_regclass(%s)', (t,))\n",
|
|
" status = 'OK' if cur.fetchone()[0] is not None else 'MISSING'\n",
|
|
" print(f'{t}: {status}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "4",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Parameters"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "5",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Set True to build only the base connection table and skip FCC API calls.\n",
|
|
"SKIP_FCC = False\n",
|
|
"\n",
|
|
"# Optional override in YYYY-MM-DD format; when None, uses latest from FCC API.\n",
|
|
"AS_OF_DATE_OVERRIDE = None"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Create Tables and Base Utilities"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "7",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def fcc_credentials() -> tuple[str | None, str | None]:\n",
|
|
" username = os.getenv('FCC_USERNAME') or os.getenv('FCC_BDC_USERNAME')\n",
|
|
" hash_value = os.getenv('FCC_API_KEY') or os.getenv('FCC_HASH_VALUE')\n",
|
|
" return username, hash_value\n",
|
|
"\n",
|
|
"\n",
|
|
"def fcc_get(path: str, *, params: dict[str, Any] | None = None) -> dict[str, Any]:\n",
|
|
" username, hash_value = fcc_credentials()\n",
|
|
" if not username or not hash_value:\n",
|
|
" raise RuntimeError(\n",
|
|
" 'FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus FCC_API_KEY or FCC_HASH_VALUE.'\n",
|
|
" )\n",
|
|
"\n",
|
|
" url = f'{FCC_BASE_URL}{path}'\n",
|
|
" headers = {\n",
|
|
" 'username': username,\n",
|
|
" 'hash_value': hash_value,\n",
|
|
" 'user-agent': USER_AGENT,\n",
|
|
" 'accept': 'application/json',\n",
|
|
" }\n",
|
|
" response = requests.get(url, headers=headers, params=params or {}, timeout=60)\n",
|
|
" response.raise_for_status()\n",
|
|
" payload = response.json()\n",
|
|
" if str(payload.get('status_code')) in {'401', '403'} or payload.get('status') == 'fail':\n",
|
|
" raise RuntimeError(f'FCC API error for {path}: {payload}')\n",
|
|
" return payload\n",
|
|
"\n",
|
|
"\n",
|
|
"def parse_date(value: Any) -> date | None:\n",
|
|
" if value in (None, ''):\n",
|
|
" return None\n",
|
|
" if isinstance(value, date):\n",
|
|
" return value\n",
|
|
" return datetime.strptime(str(value)[:10], '%Y-%m-%d').date()\n",
|
|
"\n",
|
|
"\n",
|
|
"def to_int(value: Any) -> int | None:\n",
|
|
" if value in (None, ''):\n",
|
|
" return None\n",
|
|
" try:\n",
|
|
" return int(str(value).replace(',', ''))\n",
|
|
" except (TypeError, ValueError):\n",
|
|
" return None\n",
|
|
"\n",
|
|
"\n",
|
|
"def create_tables(cur) -> None:\n",
|
|
" cur.execute('create extension if not exists postgis')\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" create table if not exists {AS_OF_TABLE} (\n",
|
|
" data_type text not null,\n",
|
|
" as_of_date date not null,\n",
|
|
" raw jsonb not null,\n",
|
|
" fetched_at timestamptz not null default now(),\n",
|
|
" primary key (data_type, as_of_date)\n",
|
|
" )\n",
|
|
" \"\"\"\n",
|
|
" )\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" create table if not exists {FILES_TABLE} (\n",
|
|
" as_of_date date not null,\n",
|
|
" file_id bigint not null,\n",
|
|
" category text,\n",
|
|
" subcategory text,\n",
|
|
" technology_type text,\n",
|
|
" technology_code text,\n",
|
|
" technology_code_desc text,\n",
|
|
" speed_tier text,\n",
|
|
" state_fips text,\n",
|
|
" state_name text,\n",
|
|
" provider_id bigint,\n",
|
|
" provider_name text,\n",
|
|
" file_type text,\n",
|
|
" file_name text,\n",
|
|
" record_count bigint,\n",
|
|
" raw jsonb not null,\n",
|
|
" fetched_at timestamptz not null default now(),\n",
|
|
" primary key (as_of_date, file_id)\n",
|
|
" )\n",
|
|
" \"\"\"\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists fcc_bdc_availability_files_category_idx on {FILES_TABLE} (category, subcategory)'\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists fcc_bdc_availability_files_state_idx on {FILES_TABLE} (state_fips)'\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists fcc_bdc_availability_files_provider_idx on {FILES_TABLE} (provider_id)'\n",
|
|
" )\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" create table if not exists {CONNECTION_TABLE} (\n",
|
|
" master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n",
|
|
" source text,\n",
|
|
" name text,\n",
|
|
" operator text,\n",
|
|
" city text,\n",
|
|
" state text,\n",
|
|
" country text,\n",
|
|
" longitude double precision,\n",
|
|
" latitude double precision,\n",
|
|
" geom geometry(Point, 4326),\n",
|
|
" census_tract_geoid text,\n",
|
|
" census_broadband_subscription_pct numeric,\n",
|
|
" fcc_bdc_status text not null,\n",
|
|
" fcc_bdc_as_of_date date,\n",
|
|
" fcc_bdc_geography_type text,\n",
|
|
" fcc_bdc_geoid text,\n",
|
|
" fcc_provider_count integer,\n",
|
|
" fcc_fiber_provider_count integer,\n",
|
|
" fcc_cable_provider_count integer,\n",
|
|
" fcc_fixed_wireless_provider_count integer,\n",
|
|
" fcc_max_advertised_download_mbps numeric,\n",
|
|
" fcc_max_advertised_upload_mbps numeric,\n",
|
|
" fcc_100_20_provider_count integer,\n",
|
|
" fcc_summary_json jsonb,\n",
|
|
" fetched_at timestamptz not null default now(),\n",
|
|
" updated_at timestamptz not null default now()\n",
|
|
" )\n",
|
|
" \"\"\"\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists data_center_broadband_connection_geom_gix on {CONNECTION_TABLE} using gist (geom)'\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists data_center_broadband_connection_tract_idx on {CONNECTION_TABLE} (census_tract_geoid)'\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists data_center_broadband_connection_status_idx on {CONNECTION_TABLE} (fcc_bdc_status)'\n",
|
|
" )\n",
|
|
"\n",
|
|
"\n",
|
|
"def rebuild_connection_base(cur, status: str) -> int:\n",
|
|
" cur.execute(f'truncate {CONNECTION_TABLE}')\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" insert into {CONNECTION_TABLE} (\n",
|
|
" master_id, source, name, operator, city, state, country,\n",
|
|
" longitude, latitude, geom,\n",
|
|
" census_tract_geoid, census_broadband_subscription_pct,\n",
|
|
" fcc_bdc_status\n",
|
|
" )\n",
|
|
" select\n",
|
|
" dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,\n",
|
|
" dc.longitude, dc.latitude, dc.geom,\n",
|
|
" dc.geoid as census_tract_geoid,\n",
|
|
" tr.broadband_subscription_pct as census_broadband_subscription_pct,\n",
|
|
" %s as fcc_bdc_status\n",
|
|
" from {MASTER_TABLE} dc\n",
|
|
" left join {TRACT_TABLE} tr on tr.geoid::text = dc.geoid::text\n",
|
|
" \"\"\",\n",
|
|
" (status,),\n",
|
|
" )\n",
|
|
" cur.execute(f'select count(*) from {CONNECTION_TABLE}')\n",
|
|
" return cur.fetchone()[0]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "8",
|
|
"metadata": {},
|
|
"source": [
|
|
"## FCC Catalog Load Functions"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "9",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def latest_availability_date(rows: list[dict[str, Any]]) -> date | None:\n",
|
|
" dates = [\n",
|
|
" parse_date(r.get('as_of_date'))\n",
|
|
" for r in rows\n",
|
|
" if str(r.get('data_type', '')).lower() in {'availability', 'availability data'}\n",
|
|
" ]\n",
|
|
" dates = [d for d in dates if d is not None]\n",
|
|
" return max(dates) if dates else None\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_as_of_dates(cur) -> date:\n",
|
|
" payload = fcc_get('/map/listAsOfDates')\n",
|
|
" rows = payload.get('data') or []\n",
|
|
" values = []\n",
|
|
" for row in rows:\n",
|
|
" as_of_date = parse_date(row.get('as_of_date'))\n",
|
|
" if not as_of_date:\n",
|
|
" continue\n",
|
|
" values.append((row.get('data_type'), as_of_date, Json(row)))\n",
|
|
"\n",
|
|
" if values:\n",
|
|
" execute_values(\n",
|
|
" cur,\n",
|
|
" f\"\"\"\n",
|
|
" insert into {AS_OF_TABLE} (data_type, as_of_date, raw)\n",
|
|
" values %s\n",
|
|
" on conflict (data_type, as_of_date) do update set\n",
|
|
" raw = excluded.raw,\n",
|
|
" fetched_at = now()\n",
|
|
" \"\"\",\n",
|
|
" values,\n",
|
|
" page_size=1000,\n",
|
|
" )\n",
|
|
"\n",
|
|
" latest = latest_availability_date(rows)\n",
|
|
" if latest is None:\n",
|
|
" raise RuntimeError(f'Could not find an availability as_of_date in FCC response: {rows}')\n",
|
|
" return latest\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_availability_file_catalog(cur, as_of_date: date) -> int:\n",
|
|
" payload = fcc_get(\n",
|
|
" f'/map/downloads/listAvailabilityData/{as_of_date:%Y-%m-%d}',\n",
|
|
" params={'technology_type': 'Fixed Broadband'},\n",
|
|
" )\n",
|
|
" rows = payload.get('data') or []\n",
|
|
" values = []\n",
|
|
" for row in rows:\n",
|
|
" file_id = to_int(row.get('file_id'))\n",
|
|
" if file_id is None:\n",
|
|
" continue\n",
|
|
" values.append(\n",
|
|
" (\n",
|
|
" as_of_date,\n",
|
|
" file_id,\n",
|
|
" row.get('category'),\n",
|
|
" row.get('subcategory'),\n",
|
|
" row.get('technology_type'),\n",
|
|
" row.get('technology_code'),\n",
|
|
" row.get('technology_code_desc'),\n",
|
|
" row.get('speed_tier'),\n",
|
|
" row.get('state_fips'),\n",
|
|
" row.get('state_name'),\n",
|
|
" to_int(row.get('provider_id')),\n",
|
|
" row.get('provider_name'),\n",
|
|
" row.get('file_type'),\n",
|
|
" row.get('file_name'),\n",
|
|
" to_int(row.get('record_count')),\n",
|
|
" Json(row),\n",
|
|
" )\n",
|
|
" )\n",
|
|
"\n",
|
|
" if values:\n",
|
|
" cur.execute(f'delete from {FILES_TABLE} where as_of_date = %s', (as_of_date,))\n",
|
|
" execute_values(\n",
|
|
" cur,\n",
|
|
" f\"\"\"\n",
|
|
" insert into {FILES_TABLE} (\n",
|
|
" as_of_date, file_id, category, subcategory, technology_type,\n",
|
|
" technology_code, technology_code_desc, speed_tier, state_fips,\n",
|
|
" state_name, provider_id, provider_name, file_type, file_name,\n",
|
|
" record_count, raw\n",
|
|
" )\n",
|
|
" values %s\n",
|
|
" \"\"\",\n",
|
|
" values,\n",
|
|
" page_size=1000,\n",
|
|
" )\n",
|
|
" return len(values)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "10",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Run Load Pipeline\n",
|
|
"\n",
|
|
"This cell mirrors the script's `main()` behavior."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "11",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"username, hash_value = fcc_credentials()\n",
|
|
"status = 'pending_fcc_username' if hash_value and not username else 'pending_fcc_catalog'\n",
|
|
"if SKIP_FCC:\n",
|
|
" status = 'fcc_skipped'\n",
|
|
"\n",
|
|
"as_of_date = None\n",
|
|
"n_files = 0\n",
|
|
"\n",
|
|
"with get_conn() as conn:\n",
|
|
" with conn.cursor() as cur:\n",
|
|
" create_tables(cur)\n",
|
|
" n_connection = rebuild_connection_base(cur, status)\n",
|
|
" print(f'{CONNECTION_TABLE}: {n_connection:,} base rows')\n",
|
|
"\n",
|
|
" if SKIP_FCC:\n",
|
|
" conn.commit()\n",
|
|
" print('FCC load skipped (SKIP_FCC=True).')\n",
|
|
" elif not username or not hash_value:\n",
|
|
" conn.commit()\n",
|
|
" print('FCC catalog not loaded: set FCC_USERNAME/FCC_BDC_USERNAME and FCC_API_KEY/FCC_HASH_VALUE.')\n",
|
|
" else:\n",
|
|
" as_of_date = parse_date(AS_OF_DATE_OVERRIDE) if AS_OF_DATE_OVERRIDE else load_as_of_dates(cur)\n",
|
|
" n_files = load_availability_file_catalog(cur, as_of_date)\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" update {CONNECTION_TABLE}\n",
|
|
" set fcc_bdc_status = 'fcc_catalog_loaded',\n",
|
|
" fcc_bdc_as_of_date = %s,\n",
|
|
" updated_at = now()\n",
|
|
" \"\"\",\n",
|
|
" (as_of_date,),\n",
|
|
" )\n",
|
|
" conn.commit()\n",
|
|
"\n",
|
|
"if as_of_date is not None:\n",
|
|
" print(f'{AS_OF_TABLE}: loaded latest availability date {as_of_date}')\n",
|
|
" print(f'{FILES_TABLE}: {n_files:,} fixed-broadband file catalog rows')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "12",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Quick QA"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "13",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with get_conn() as conn:\n",
|
|
" q1 = f\"\"\"\n",
|
|
" select fcc_bdc_status, count(*) as n\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" group by 1\n",
|
|
" order by 2 desc\n",
|
|
" \"\"\"\n",
|
|
" display(pd.read_sql(q1, conn))\n",
|
|
"\n",
|
|
" q2 = f\"\"\"\n",
|
|
" select as_of_date, count(*) as file_rows\n",
|
|
" from {FILES_TABLE}\n",
|
|
" group by 1\n",
|
|
" order by as_of_date desc\n",
|
|
" limit 10\n",
|
|
" \"\"\"\n",
|
|
" display(pd.read_sql(q2, conn))\n",
|
|
"\n",
|
|
" q3 = f\"\"\"\n",
|
|
" select master_id, name, state, fcc_bdc_status, fcc_bdc_as_of_date\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" order by master_id\n",
|
|
" limit 20\n",
|
|
" \"\"\"\n",
|
|
" display(pd.read_sql(q3, conn))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "14",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Tables Created and Relationships\n",
|
|
"\n",
|
|
"### Tables Created\n",
|
|
"\n",
|
|
"1. **`public.fcc_bdc_api_as_of_dates`**\n",
|
|
" - Purpose: Stores FCC API-reported as-of dates by data type.\n",
|
|
" - Grain: one row per `(data_type, as_of_date)`.\n",
|
|
" - Key columns:\n",
|
|
" - `data_type`\n",
|
|
" - `as_of_date`\n",
|
|
" - `raw` (full FCC response payload for that row)\n",
|
|
"\n",
|
|
"2. **`public.fcc_bdc_availability_files`**\n",
|
|
" - Purpose: Stores FCC fixed-broadband availability file catalog entries for a specific as-of date.\n",
|
|
" - Grain: one row per `(as_of_date, file_id)`.\n",
|
|
" - Key columns:\n",
|
|
" - `as_of_date`\n",
|
|
" - `file_id`\n",
|
|
" - `category`, `subcategory`, `technology_type`, `technology_code_desc`\n",
|
|
" - `state_fips`, `state_name`\n",
|
|
" - `provider_id`, `provider_name`\n",
|
|
" - `file_name`, `record_count`, `raw`\n",
|
|
"\n",
|
|
"3. **`public.fcc_bdc_provider_summary`**\n",
|
|
" - Purpose: Stores provider-summary rows from the FCC provider-level download.\n",
|
|
" - Grain: one row per `(as_of_date, file_id, provider_id, technology_code, technology_code_desc)`.\n",
|
|
" - Key columns:\n",
|
|
" - `provider_id`, `holding_company`\n",
|
|
" - `technology_code`, `technology_code_desc`, `provider_class`\n",
|
|
" - `location_count_res`, `unit_count_res`, `location_count_bus`, `unit_count_bus`\n",
|
|
"\n",
|
|
"4. **`public.fcc_bdc_summary_geography`**\n",
|
|
" - Purpose: Stores FCC summary-by-geography rows for states and counties.\n",
|
|
" - Grain: one row per `(as_of_date, file_id, geography_type, geography_id, biz_res, technology)`.\n",
|
|
" - Key columns:\n",
|
|
" - `geography_type`, `geography_id`, `geography_desc_full`\n",
|
|
" - `technology`, `biz_res`\n",
|
|
" - `total_units`, `speed_02_02`, `speed_10_1`, `speed_25_3`, `speed_100_20`, `speed_250_25`, `speed_1000_100`\n",
|
|
"\n",
|
|
"5. **`public.data_center_broadband_connection`**\n",
|
|
" - Purpose: One-row-per-data-center connection profile and FCC load status.\n",
|
|
" - Grain: one row per `master_id`.\n",
|
|
" - Key columns:\n",
|
|
" - `master_id` (PK)\n",
|
|
" - core DC attributes copied from `public.master_data_centers`\n",
|
|
" - `census_tract_geoid`, `census_broadband_subscription_pct`\n",
|
|
" - FCC status/tracking fields: `fcc_bdc_status`, `fcc_bdc_as_of_date`\n",
|
|
" - scalar summary fields for download/upload speed, provider counts, and `fcc_summary_json`\n",
|
|
"\n",
|
|
"### Relationships\n",
|
|
"\n",
|
|
"- `public.master_data_centers` -> `public.data_center_broadband_connection`\n",
|
|
" - Relationship: **1:1 by `master_id`**\n",
|
|
" - Enforced by foreign key on `data_center_broadband_connection.master_id`.\n",
|
|
"\n",
|
|
"- `public.data_center_census_tracts_2024` -> `public.data_center_broadband_connection`\n",
|
|
" - Relationship: **many:1 via tract GEOID** during base rebuild.\n",
|
|
" - Join used in notebook: `data_center_census_tracts_2024.geoid::text = master_data_centers.geoid::text`.\n",
|
|
"\n",
|
|
"- `public.fcc_bdc_api_as_of_dates` -> `public.fcc_bdc_availability_files`\n",
|
|
" - Relationship: **1:many by `as_of_date`** (logical relationship).\n",
|
|
" - Not enforced with an explicit FK, but both tables are connected by matching `as_of_date`.\n",
|
|
"\n",
|
|
"- `public.fcc_bdc_availability_files` -> `public.data_center_broadband_connection`\n",
|
|
" - Current relationship: **status/date attribution plus summary-file discovery**.\n",
|
|
" - The notebook uses the availability catalog to find the summary and provider downloads for the current as-of date.\n",
|
|
"\n",
|
|
"- `public.fcc_bdc_summary_geography` -> `public.data_center_broadband_connection`\n",
|
|
" - Relationship: **many:1 via county/state GEOID fallback**.\n",
|
|
" - County rows are matched on the first 5 digits of `census_tract_geoid`; state rows are used as a fallback.\n",
|
|
"\n",
|
|
"- `public.fcc_bdc_provider_summary` -> `public.data_center_broadband_connection`\n",
|
|
" - Relationship: **global aggregate context**.\n",
|
|
" - Provider-count columns are filled from provider-summary aggregates because the provider file is not geography-specific.\n",
|
|
"\n",
|
|
"### Load Behavior Summary\n",
|
|
"\n",
|
|
"- Base rebuild always refreshes `public.data_center_broadband_connection` from master DC + tract context.\n",
|
|
"- If FCC credentials are available and `SKIP_FCC=False`:\n",
|
|
" - latest (or overridden) as-of dates are loaded into `public.fcc_bdc_api_as_of_dates`\n",
|
|
" - file catalog rows for that as-of date are loaded into `public.fcc_bdc_availability_files`\n",
|
|
" - summary-by-geography rows are staged into `public.fcc_bdc_summary_geography`\n",
|
|
" - provider-summary rows are staged into `public.fcc_bdc_provider_summary`\n",
|
|
" - `public.data_center_broadband_connection` is updated with FCC summary linkage and derived scalar fields"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "15",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Inspect FCC API Download Content\n",
|
|
"\n",
|
|
"This section checks what the FCC API actually returns in downloadable availability summary files, so we can map real fields to the currently-null `data_center_broadband_connection` columns."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "16",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import io\n",
|
|
"import zipfile\n",
|
|
"\n",
|
|
"\n",
|
|
"def download_availability_file(file_id: int, *, file_type: int | None = None) -> tuple[str, pd.DataFrame]:\n",
|
|
" path = f'/map/downloads/downloadFile/availability/{int(file_id)}'\n",
|
|
" if file_type is not None:\n",
|
|
" path = f'{path}/{int(file_type)}'\n",
|
|
"\n",
|
|
" payload = fcc_get(path) if False else None # keep linter quiet; endpoint returns binary, not JSON\n",
|
|
"\n",
|
|
" username, hash_value = fcc_credentials()\n",
|
|
" if not username or not hash_value:\n",
|
|
" raise RuntimeError('FCC credentials are required to download files.')\n",
|
|
"\n",
|
|
" headers = {\n",
|
|
" 'username': username,\n",
|
|
" 'hash_value': hash_value,\n",
|
|
" 'user-agent': USER_AGENT,\n",
|
|
" 'accept': '*/*',\n",
|
|
" }\n",
|
|
" response = requests.get(f'{FCC_BASE_URL}{path}', headers=headers, timeout=120)\n",
|
|
" response.raise_for_status()\n",
|
|
"\n",
|
|
" content_type = (response.headers.get('content-type') or '').lower()\n",
|
|
" if 'application/zip' not in content_type and not response.content.startswith(b'PK'):\n",
|
|
" raise RuntimeError(f'Expected a ZIP payload, got content-type={content_type}')\n",
|
|
"\n",
|
|
" with zipfile.ZipFile(io.BytesIO(response.content)) as zf:\n",
|
|
" csv_members = [n for n in zf.namelist() if n.lower().endswith('.csv')]\n",
|
|
" if not csv_members:\n",
|
|
" raise RuntimeError(f'ZIP has no CSV members: {zf.namelist()}')\n",
|
|
" csv_name = csv_members[0]\n",
|
|
" with zf.open(csv_name) as f:\n",
|
|
" df = pd.read_csv(f, low_memory=False)\n",
|
|
" return csv_name, df\n",
|
|
"\n",
|
|
"\n",
|
|
"with get_conn() as conn:\n",
|
|
" summary_files = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select as_of_date, file_id, category, subcategory, technology_type, file_type, file_name, record_count\n",
|
|
" from {FILES_TABLE}\n",
|
|
" where category = 'Summary'\n",
|
|
" order by as_of_date desc,\n",
|
|
" case subcategory when 'Summary by Geography Type - Other Geographies' then 0\n",
|
|
" when 'Summary by Geography Type - Census Place' then 1\n",
|
|
" when 'Provider Summary' then 2\n",
|
|
" else 3 end,\n",
|
|
" file_id\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
"display(summary_files.head(25))\n",
|
|
"print(f'Summary files available: {len(summary_files):,}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "17",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"if summary_files.empty:\n",
|
|
" raise RuntimeError('No Summary files found in FCC catalog table.')\n",
|
|
"\n",
|
|
"inspect_rows = []\n",
|
|
"for _, r in summary_files.head(3).iterrows():\n",
|
|
" csv_name, df_inspect = download_availability_file(int(r['file_id']))\n",
|
|
" cols = list(df_inspect.columns)\n",
|
|
" inspect_rows.append({\n",
|
|
" 'file_id': int(r['file_id']),\n",
|
|
" 'subcategory': r['subcategory'],\n",
|
|
" 'csv_name': csv_name,\n",
|
|
" 'rows': len(df_inspect),\n",
|
|
" 'cols': len(cols),\n",
|
|
" 'sample_columns': ', '.join(cols[:12]),\n",
|
|
" })\n",
|
|
"\n",
|
|
"inspect_df = pd.DataFrame(inspect_rows)\n",
|
|
"display(inspect_df)\n",
|
|
"\n",
|
|
"target_tokens = [\n",
|
|
" 'provider', 'technology', 'speed', 'download', 'upload', 'geography', 'geoid', 'state', 'county', 'place'\n",
|
|
"]\n",
|
|
"\n",
|
|
"first_file_id = int(summary_files.iloc[0]['file_id'])\n",
|
|
"first_csv, first_df = download_availability_file(first_file_id)\n",
|
|
"matching_cols = [c for c in first_df.columns if any(t in c.lower() for t in target_tokens)]\n",
|
|
"\n",
|
|
"print(f'Inspected file_id={first_file_id}, csv={first_csv}, rows={len(first_df):,}, cols={len(first_df.columns):,}')\n",
|
|
"print('Columns that may map to broadband summary fields:')\n",
|
|
"for c in matching_cols:\n",
|
|
" print(' -', c)\n",
|
|
"\n",
|
|
"display(first_df.head(10))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "18",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"SUMMARY_TABLE = 'public.fcc_bdc_summary_geography'\n",
|
|
"\n",
|
|
"\n",
|
|
"def to_float(value):\n",
|
|
" if value in (None, '', 'NA', 'N/A'):\n",
|
|
" return None\n",
|
|
" try:\n",
|
|
" return float(str(value).replace(',', '').strip())\n",
|
|
" except (TypeError, ValueError):\n",
|
|
" return None\n",
|
|
"\n",
|
|
"\n",
|
|
"def normalize_geography_id(geography_type: str, geography_id: str) -> str:\n",
|
|
" gtype = (geography_type or '').strip()\n",
|
|
" gid = (geography_id or '').strip()\n",
|
|
" if gtype == 'State':\n",
|
|
" return gid.zfill(2)\n",
|
|
" if gtype == 'County':\n",
|
|
" return gid.zfill(5)\n",
|
|
" return gid\n",
|
|
"\n",
|
|
"\n",
|
|
"def create_summary_table(cur) -> None:\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" create table if not exists {SUMMARY_TABLE} (\n",
|
|
" as_of_date date not null,\n",
|
|
" file_id bigint not null,\n",
|
|
" geography_type text not null,\n",
|
|
" geography_id text not null,\n",
|
|
" geography_desc text,\n",
|
|
" geography_desc_full text,\n",
|
|
" area_data_type text,\n",
|
|
" biz_res text not null default '',\n",
|
|
" technology text not null default '',\n",
|
|
" total_units numeric,\n",
|
|
" speed_02_02 numeric,\n",
|
|
" speed_10_1 numeric,\n",
|
|
" speed_25_3 numeric,\n",
|
|
" speed_100_20 numeric,\n",
|
|
" speed_250_25 numeric,\n",
|
|
" speed_1000_100 numeric,\n",
|
|
" raw jsonb not null,\n",
|
|
" fetched_at timestamptz not null default now(),\n",
|
|
" primary key (as_of_date, file_id, geography_type, geography_id, biz_res, technology)\n",
|
|
" )\n",
|
|
" \"\"\"\n",
|
|
" )\n",
|
|
" cur.execute(\n",
|
|
" f'create index if not exists fcc_bdc_summary_geography_lookup_idx on {SUMMARY_TABLE} (as_of_date, geography_type, geography_id, technology)'\n",
|
|
" )\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_summary_geography(cur, as_of_date: date, *, max_files: int | None = None) -> tuple[int, int]:\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" select file_id, subcategory\n",
|
|
" from {FILES_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" and category = 'Summary'\n",
|
|
" and subcategory in (\n",
|
|
" 'Summary by Geography Type - Other Geographies',\n",
|
|
" 'Summary by Geography Type - Census Place'\n",
|
|
" )\n",
|
|
" order by case subcategory\n",
|
|
" when 'Summary by Geography Type - Other Geographies' then 0\n",
|
|
" when 'Summary by Geography Type - Census Place' then 1\n",
|
|
" else 2\n",
|
|
" end,\n",
|
|
" file_id\n",
|
|
" \"\"\",\n",
|
|
" (as_of_date,),\n",
|
|
" )\n",
|
|
" files = cur.fetchall()\n",
|
|
" if max_files is not None:\n",
|
|
" files = files[:max_files]\n",
|
|
"\n",
|
|
" if not files:\n",
|
|
" return 0, 0\n",
|
|
"\n",
|
|
" cur.execute(f'delete from {SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n",
|
|
"\n",
|
|
" file_count = 0\n",
|
|
" row_count = 0\n",
|
|
" for file_id, subcategory in files:\n",
|
|
" csv_name, df = download_availability_file(int(file_id))\n",
|
|
" if df.empty:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" keep = df[df['geography_type'].isin(['State', 'County'])].copy()\n",
|
|
" if keep.empty:\n",
|
|
" print(f'file_id={file_id} ({subcategory}): no State/County rows found in {csv_name}')\n",
|
|
" continue\n",
|
|
"\n",
|
|
" values = []\n",
|
|
" for row in keep.to_dict('records'):\n",
|
|
" geography_type = str(row.get('geography_type') or '').strip()\n",
|
|
" geography_id = normalize_geography_id(geography_type, str(row.get('geography_id') or ''))\n",
|
|
" if not geography_type or not geography_id:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" values.append(\n",
|
|
" (\n",
|
|
" as_of_date,\n",
|
|
" int(file_id),\n",
|
|
" geography_type,\n",
|
|
" geography_id,\n",
|
|
" row.get('geography_desc'),\n",
|
|
" row.get('geography_desc_full'),\n",
|
|
" row.get('area_data_type'),\n",
|
|
" str(row.get('biz_res') or ''),\n",
|
|
" str(row.get('technology') or ''),\n",
|
|
" to_float(row.get('total_units')),\n",
|
|
" to_float(row.get('speed_02_02')),\n",
|
|
" to_float(row.get('speed_10_1')),\n",
|
|
" to_float(row.get('speed_25_3')),\n",
|
|
" to_float(row.get('speed_100_20')),\n",
|
|
" to_float(row.get('speed_250_25')),\n",
|
|
" to_float(row.get('speed_1000_100')),\n",
|
|
" Json(row),\n",
|
|
" )\n",
|
|
" )\n",
|
|
"\n",
|
|
" if values:\n",
|
|
" execute_values(\n",
|
|
" cur,\n",
|
|
" f\"\"\"\n",
|
|
" insert into {SUMMARY_TABLE} (\n",
|
|
" as_of_date, file_id, geography_type, geography_id,\n",
|
|
" geography_desc, geography_desc_full,\n",
|
|
" area_data_type, biz_res, technology, total_units,\n",
|
|
" speed_02_02, speed_10_1, speed_25_3,\n",
|
|
" speed_100_20, speed_250_25, speed_1000_100,\n",
|
|
" raw\n",
|
|
" )\n",
|
|
" values %s\n",
|
|
" on conflict (as_of_date, file_id, geography_type, geography_id, biz_res, technology)\n",
|
|
" do update set\n",
|
|
" geography_desc = excluded.geography_desc,\n",
|
|
" geography_desc_full = excluded.geography_desc_full,\n",
|
|
" area_data_type = excluded.area_data_type,\n",
|
|
" total_units = excluded.total_units,\n",
|
|
" speed_02_02 = excluded.speed_02_02,\n",
|
|
" speed_10_1 = excluded.speed_10_1,\n",
|
|
" speed_25_3 = excluded.speed_25_3,\n",
|
|
" speed_100_20 = excluded.speed_100_20,\n",
|
|
" speed_250_25 = excluded.speed_250_25,\n",
|
|
" speed_1000_100 = excluded.speed_1000_100,\n",
|
|
" raw = excluded.raw,\n",
|
|
" fetched_at = now()\n",
|
|
" \"\"\",\n",
|
|
" values,\n",
|
|
" page_size=1000,\n",
|
|
" )\n",
|
|
" row_count += len(values)\n",
|
|
" file_count += 1\n",
|
|
" print(f'loaded file_id={file_id} ({subcategory}) from {csv_name}: {len(values):,} state/county rows')\n",
|
|
"\n",
|
|
" return file_count, row_count"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "19",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Phase 2: Ingest Summary Geography Metrics into Connection Table\n",
|
|
"\n",
|
|
"This phase downloads FCC Summary files, stages county/state metrics in PostGIS, and joins the best available geography level back to `public.data_center_broadband_connection`."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "20",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def connection_null_snapshot(conn) -> pd.DataFrame:\n",
|
|
" return pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" count(*) as total_rows,\n",
|
|
" count(*) filter (where fcc_bdc_geography_type is null) as null_geography_type,\n",
|
|
" count(*) filter (where fcc_bdc_geoid is null) as null_geography_id,\n",
|
|
" count(*) filter (where fcc_summary_json is null) as null_summary_json\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
"\n",
|
|
"with get_conn() as conn:\n",
|
|
" before = connection_null_snapshot(conn)\n",
|
|
" with conn.cursor() as cur:\n",
|
|
" create_summary_table(cur)\n",
|
|
"\n",
|
|
" cur.execute(f'select max(as_of_date) from {FILES_TABLE}')\n",
|
|
" as_of_date = cur.fetchone()[0]\n",
|
|
" if as_of_date is None:\n",
|
|
" raise RuntimeError(f'No as_of_date found in {FILES_TABLE}. Run FCC catalog load first.')\n",
|
|
"\n",
|
|
" loaded_files, loaded_rows = load_summary_geography(cur, as_of_date)\n",
|
|
" if loaded_files == 0:\n",
|
|
" raise RuntimeError('No summary files were loaded into staging table.')\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f'''\n",
|
|
" with county_ranked as (\n",
|
|
" select\n",
|
|
" geography_id,\n",
|
|
" file_id,\n",
|
|
" technology,\n",
|
|
" biz_res,\n",
|
|
" speed_02_02,\n",
|
|
" speed_10_1,\n",
|
|
" speed_25_3,\n",
|
|
" speed_100_20,\n",
|
|
" speed_250_25,\n",
|
|
" speed_1000_100,\n",
|
|
" row_number() over (\n",
|
|
" partition by geography_id\n",
|
|
" order by\n",
|
|
" case when technology = 'Any Technology' then 0 else 1 end,\n",
|
|
" case when biz_res in ('All Locations', 'Total', '') then 0 else 1 end,\n",
|
|
" speed_100_20 desc nulls last,\n",
|
|
" file_id desc\n",
|
|
" ) as rn\n",
|
|
" from {SUMMARY_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" and geography_type = 'County'\n",
|
|
" ),\n",
|
|
" county_best as (\n",
|
|
" select * from county_ranked where rn = 1\n",
|
|
" ),\n",
|
|
" state_ranked as (\n",
|
|
" select\n",
|
|
" geography_id,\n",
|
|
" file_id,\n",
|
|
" technology,\n",
|
|
" biz_res,\n",
|
|
" speed_02_02,\n",
|
|
" speed_10_1,\n",
|
|
" speed_25_3,\n",
|
|
" speed_100_20,\n",
|
|
" speed_250_25,\n",
|
|
" speed_1000_100,\n",
|
|
" row_number() over (\n",
|
|
" partition by geography_id\n",
|
|
" order by\n",
|
|
" case when technology = 'Any Technology' then 0 else 1 end,\n",
|
|
" case when biz_res in ('All Locations', 'Total', '') then 0 else 1 end,\n",
|
|
" speed_100_20 desc nulls last,\n",
|
|
" file_id desc\n",
|
|
" ) as rn\n",
|
|
" from {SUMMARY_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" and geography_type = 'State'\n",
|
|
" ),\n",
|
|
" state_best as (\n",
|
|
" select * from state_ranked where rn = 1\n",
|
|
" ),\n",
|
|
" matched as (\n",
|
|
" select\n",
|
|
" c.master_id,\n",
|
|
" coalesce(cb.geography_id, sb.geography_id) as geography_id,\n",
|
|
" case when cb.geography_id is not null then 'County' else 'State' end as geography_level,\n",
|
|
" coalesce(cb.file_id, sb.file_id) as file_id,\n",
|
|
" coalesce(cb.technology, sb.technology) as technology,\n",
|
|
" coalesce(cb.biz_res, sb.biz_res) as biz_res,\n",
|
|
" coalesce(cb.speed_02_02, sb.speed_02_02) as speed_02_02,\n",
|
|
" coalesce(cb.speed_10_1, sb.speed_10_1) as speed_10_1,\n",
|
|
" coalesce(cb.speed_25_3, sb.speed_25_3) as speed_25_3,\n",
|
|
" coalesce(cb.speed_100_20, sb.speed_100_20) as speed_100_20,\n",
|
|
" coalesce(cb.speed_250_25, sb.speed_250_25) as speed_250_25,\n",
|
|
" coalesce(cb.speed_1000_100, sb.speed_1000_100) as speed_1000_100\n",
|
|
" from {CONNECTION_TABLE} c\n",
|
|
" left join county_best cb\n",
|
|
" on cb.geography_id = left(c.census_tract_geoid, 5)\n",
|
|
" left join state_best sb\n",
|
|
" on sb.geography_id = left(c.census_tract_geoid, 2)\n",
|
|
" where cb.geography_id is not null or sb.geography_id is not null\n",
|
|
" )\n",
|
|
" update {CONNECTION_TABLE} c\n",
|
|
" set\n",
|
|
" fcc_bdc_as_of_date = %s,\n",
|
|
" fcc_bdc_geography_type = m.geography_level,\n",
|
|
" fcc_bdc_geoid = m.geography_id,\n",
|
|
" fcc_summary_json = jsonb_build_object(\n",
|
|
" 'source', 'fcc_summary_download',\n",
|
|
" 'as_of_date', %s::text,\n",
|
|
" 'file_id', m.file_id,\n",
|
|
" 'join_level', m.geography_level,\n",
|
|
" 'technology', m.technology,\n",
|
|
" 'biz_res', m.biz_res,\n",
|
|
" 'speed_02_02', m.speed_02_02,\n",
|
|
" 'speed_10_1', m.speed_10_1,\n",
|
|
" 'speed_25_3', m.speed_25_3,\n",
|
|
" 'speed_100_20', m.speed_100_20,\n",
|
|
" 'speed_250_25', m.speed_250_25,\n",
|
|
" 'speed_1000_100', m.speed_1000_100\n",
|
|
" ),\n",
|
|
" fcc_bdc_status = 'fcc_summary_joined',\n",
|
|
" updated_at = now()\n",
|
|
" from matched m\n",
|
|
" where c.master_id = m.master_id\n",
|
|
" ''',\n",
|
|
" (as_of_date, as_of_date, as_of_date, as_of_date),\n",
|
|
" )\n",
|
|
" updated_rows = cur.rowcount\n",
|
|
"\n",
|
|
" after = connection_null_snapshot(conn)\n",
|
|
"\n",
|
|
"print(f'Using FCC as_of_date: {as_of_date}')\n",
|
|
"print(f'Staging loaded files: {loaded_files:,} | rows: {loaded_rows:,}')\n",
|
|
"print(f'Updated connection rows: {updated_rows:,}')\n",
|
|
"print('\\nNull snapshot before:')\n",
|
|
"display(before)\n",
|
|
"print('Null snapshot after:')\n",
|
|
"display(after)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "21",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with get_conn() as conn:\n",
|
|
" join_counts = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" coalesce(fcc_bdc_geography_type, 'Unmatched') as join_level,\n",
|
|
" count(*) as data_center_count,\n",
|
|
" round(avg((fcc_summary_json ->> 'speed_100_20')::numeric), 6) as avg_speed_100_20,\n",
|
|
" round(avg((fcc_summary_json ->> 'speed_1000_100')::numeric), 6) as avg_speed_1000_100\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" group by 1\n",
|
|
" order by case coalesce(fcc_bdc_geography_type, 'Unmatched')\n",
|
|
" when 'County' then 0\n",
|
|
" when 'State' then 1\n",
|
|
" else 2\n",
|
|
" end\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
" sample_joined = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" master_id,\n",
|
|
" name,\n",
|
|
" state,\n",
|
|
" census_tract_geoid,\n",
|
|
" fcc_bdc_geography_type,\n",
|
|
" fcc_bdc_geoid,\n",
|
|
" fcc_summary_json ->> 'technology' as fcc_technology,\n",
|
|
" fcc_summary_json ->> 'speed_100_20' as fcc_speed_100_20,\n",
|
|
" fcc_summary_json ->> 'speed_1000_100' as fcc_speed_1000_100\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" where fcc_summary_json is not null\n",
|
|
" order by updated_at desc, master_id\n",
|
|
" limit 25\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
"print('Join coverage summary:')\n",
|
|
"display(join_counts)\n",
|
|
"print('Sample joined rows:')\n",
|
|
"display(sample_joined)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "22",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with get_conn() as conn:\n",
|
|
" derived_qa = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" count(*) as total_rows,\n",
|
|
" count(*) filter (where fcc_summary_json is not null) as rows_with_summary_json,\n",
|
|
" count(*) filter (where fcc_max_advertised_download_mbps is not null) as rows_with_max_download,\n",
|
|
" count(*) filter (where fcc_max_advertised_upload_mbps is not null) as rows_with_max_upload,\n",
|
|
" count(*) filter (where fcc_provider_count is not null) as rows_with_provider_count,\n",
|
|
" count(*) filter (where fcc_100_20_provider_count is not null) as rows_with_100_20_provider_count\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
" tier_dist = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" fcc_max_advertised_download_mbps,\n",
|
|
" fcc_max_advertised_upload_mbps,\n",
|
|
" count(*) as row_count\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" where fcc_summary_json is not null\n",
|
|
" group by 1, 2\n",
|
|
" order by 1 desc nulls last, 2 desc nulls last\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
"print('Derived scalar QA:')\n",
|
|
"display(derived_qa)\n",
|
|
"print('Derived tier distribution:')\n",
|
|
"display(tier_dist.head(20))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "23",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with get_conn() as conn:\n",
|
|
" with conn.cursor() as cur:\n",
|
|
" cur.execute(\n",
|
|
" f'''\n",
|
|
" update {CONNECTION_TABLE}\n",
|
|
" set\n",
|
|
" fcc_max_advertised_download_mbps = case\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_1000_100')::numeric, 0) > 0 then 1000\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_250_25')::numeric, 0) > 0 then 250\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_100_20')::numeric, 0) > 0 then 100\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_25_3')::numeric, 0) > 0 then 25\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_10_1')::numeric, 0) > 0 then 10\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_02_02')::numeric, 0) > 0 then 2\n",
|
|
" else null\n",
|
|
" end,\n",
|
|
" fcc_max_advertised_upload_mbps = case\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_1000_100')::numeric, 0) > 0 then 100\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_250_25')::numeric, 0) > 0 then 25\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_100_20')::numeric, 0) > 0 then 20\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_25_3')::numeric, 0) > 0 then 3\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_10_1')::numeric, 0) > 0 then 1\n",
|
|
" when coalesce((fcc_summary_json ->> 'speed_02_02')::numeric, 0) > 0 then 0.2\n",
|
|
" else null\n",
|
|
" end,\n",
|
|
" fcc_bdc_status = case\n",
|
|
" when fcc_bdc_status = 'fcc_summary_joined' then 'fcc_summary_joined_derived'\n",
|
|
" else fcc_bdc_status\n",
|
|
" end,\n",
|
|
" updated_at = now()\n",
|
|
" where fcc_summary_json is not null\n",
|
|
" and fcc_summary_json ->> 'source' = 'fcc_summary_download'\n",
|
|
" '''\n",
|
|
" )\n",
|
|
" derived_rows = cur.rowcount\n",
|
|
"\n",
|
|
"print(f'Derived scalar columns for rows: {derived_rows:,}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "24",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Phase 2C: Ingest Provider Summary Catalog\n",
|
|
"\n",
|
|
"The FCC provider-summary download is provider-level rather than geography-level, so this step stages it separately and uses global provider aggregates to populate the provider count columns."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "25",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"PROVIDER_SUMMARY_TABLE = 'public.fcc_bdc_provider_summary'\n",
|
|
"\n",
|
|
"\n",
|
|
"def classify_provider_technology(technology_code_desc: str | None) -> str:\n",
|
|
" text = (technology_code_desc or '').strip().lower()\n",
|
|
" if not text:\n",
|
|
" return 'Other'\n",
|
|
" if 'fiber' in text:\n",
|
|
" return 'Fiber'\n",
|
|
" if 'cable' in text:\n",
|
|
" return 'Cable'\n",
|
|
" if 'unlicensed fixed wireless' in text or 'licensed fixed wireless' in text or 'fixed wireless' in text:\n",
|
|
" return 'Fixed Wireless'\n",
|
|
" if 'copper' in text:\n",
|
|
" return 'Copper'\n",
|
|
" if 'satellite' in text:\n",
|
|
" return 'Satellite'\n",
|
|
" return 'Other'\n",
|
|
"\n",
|
|
"\n",
|
|
"def create_provider_summary_table(cur) -> None:\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" create table if not exists {PROVIDER_SUMMARY_TABLE} (\n",
|
|
" as_of_date date not null,\n",
|
|
" file_id bigint not null,\n",
|
|
" provider_id bigint not null,\n",
|
|
" holding_company text,\n",
|
|
" technology_code text,\n",
|
|
" technology_code_desc text,\n",
|
|
" provider_class text,\n",
|
|
" location_count_res bigint,\n",
|
|
" unit_count_res bigint,\n",
|
|
" location_count_bus bigint,\n",
|
|
" unit_count_bus bigint,\n",
|
|
" raw jsonb not null,\n",
|
|
" fetched_at timestamptz not null default now(),\n",
|
|
" primary key (as_of_date, file_id, provider_id, technology_code, technology_code_desc)\n",
|
|
" )\n",
|
|
" \"\"\"\n",
|
|
" )\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_provider_summary(cur, as_of_date: date) -> tuple[int, int, int, int, int, int, int]:\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" select file_id\n",
|
|
" from {FILES_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" and category = 'Summary'\n",
|
|
" and subcategory = 'Provider Summary'\n",
|
|
" order by file_id\n",
|
|
" limit 1\n",
|
|
" \"\"\",\n",
|
|
" (as_of_date,),\n",
|
|
" )\n",
|
|
" row = cur.fetchone()\n",
|
|
" if not row:\n",
|
|
" return 0, 0, 0, 0, 0, 0, 0\n",
|
|
"\n",
|
|
" file_id = int(row[0])\n",
|
|
" csv_name, df = download_availability_file(file_id)\n",
|
|
" if df.empty:\n",
|
|
" return file_id, 0, 0, 0, 0, 0, 0\n",
|
|
"\n",
|
|
" cur.execute(f'delete from {PROVIDER_SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n",
|
|
" values = []\n",
|
|
" for row in df.to_dict('records'):\n",
|
|
" provider_id = to_int(row.get('provider_id'))\n",
|
|
" if provider_id is None:\n",
|
|
" continue\n",
|
|
" values.append((\n",
|
|
" as_of_date,\n",
|
|
" file_id,\n",
|
|
" provider_id,\n",
|
|
" row.get('holding_company'),\n",
|
|
" row.get('technology_code'),\n",
|
|
" row.get('technology_code_desc'),\n",
|
|
" classify_provider_technology(row.get('technology_code_desc')),\n",
|
|
" to_int(row.get('location_count_res')),\n",
|
|
" to_int(row.get('unit_count_res')),\n",
|
|
" to_int(row.get('location_count_bus')),\n",
|
|
" to_int(row.get('unit_count_bus')),\n",
|
|
" Json(row),\n",
|
|
" ))\n",
|
|
"\n",
|
|
" if values:\n",
|
|
" execute_values(\n",
|
|
" cur,\n",
|
|
" f\"\"\"\n",
|
|
" insert into {PROVIDER_SUMMARY_TABLE} (\n",
|
|
" as_of_date, file_id, provider_id, holding_company,\n",
|
|
" technology_code, technology_code_desc, provider_class,\n",
|
|
" location_count_res, unit_count_res, location_count_bus, unit_count_bus, raw\n",
|
|
" )\n",
|
|
" values %s\n",
|
|
" on conflict (as_of_date, file_id, provider_id, technology_code, technology_code_desc)\n",
|
|
" do update set\n",
|
|
" holding_company = excluded.holding_company,\n",
|
|
" provider_class = excluded.provider_class,\n",
|
|
" location_count_res = excluded.location_count_res,\n",
|
|
" unit_count_res = excluded.unit_count_res,\n",
|
|
" location_count_bus = excluded.location_count_bus,\n",
|
|
" unit_count_bus = excluded.unit_count_bus,\n",
|
|
" raw = excluded.raw,\n",
|
|
" fetched_at = now()\n",
|
|
" \"\"\",\n",
|
|
" values,\n",
|
|
" page_size=1000,\n",
|
|
" )\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" with provider_stats as (\n",
|
|
" select\n",
|
|
" count(distinct provider_id) as provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fiber') as fiber_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Cable') as cable_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fixed Wireless') as fixed_wireless_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Copper') as copper_provider_count\n",
|
|
" from {PROVIDER_SUMMARY_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" )\n",
|
|
" update {CONNECTION_TABLE} c\n",
|
|
" set\n",
|
|
" fcc_provider_count = s.provider_count,\n",
|
|
" fcc_fiber_provider_count = s.fiber_provider_count,\n",
|
|
" fcc_cable_provider_count = s.cable_provider_count,\n",
|
|
" fcc_fixed_wireless_provider_count = s.fixed_wireless_provider_count,\n",
|
|
" fcc_summary_json = jsonb_set(\n",
|
|
" coalesce(c.fcc_summary_json, '{{}}'::jsonb),\n",
|
|
" '{{provider_summary}}',\n",
|
|
" jsonb_build_object(\n",
|
|
" 'file_id', %s,\n",
|
|
" 'provider_count', s.provider_count,\n",
|
|
" 'fiber_provider_count', s.fiber_provider_count,\n",
|
|
" 'cable_provider_count', s.cable_provider_count,\n",
|
|
" 'fixed_wireless_provider_count', s.fixed_wireless_provider_count,\n",
|
|
" 'copper_provider_count', s.copper_provider_count\n",
|
|
" ),\n",
|
|
" true\n",
|
|
" ),\n",
|
|
" updated_at = now()\n",
|
|
" from provider_stats s\n",
|
|
" where c.fcc_summary_json is not null\n",
|
|
" \"\"\",\n",
|
|
" (as_of_date, file_id),\n",
|
|
" )\n",
|
|
"\n",
|
|
" cur.execute(f'select count(*) from {PROVIDER_SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n",
|
|
" provider_rows = cur.fetchone()[0]\n",
|
|
"\n",
|
|
" cur.execute(\n",
|
|
" f\"\"\"\n",
|
|
" select\n",
|
|
" count(distinct provider_id),\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fiber'),\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Cable'),\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fixed Wireless'),\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Copper')\n",
|
|
" from {PROVIDER_SUMMARY_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" \"\"\",\n",
|
|
" (as_of_date,),\n",
|
|
" )\n",
|
|
" provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count = cur.fetchone()\n",
|
|
" return file_id, provider_rows, provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count\n",
|
|
"\n",
|
|
"\n",
|
|
"with get_conn() as conn:\n",
|
|
" with conn.cursor() as cur:\n",
|
|
" create_provider_summary_table(cur)\n",
|
|
" cur.execute(f'select max(as_of_date) from {FILES_TABLE}')\n",
|
|
" provider_as_of_date = cur.fetchone()[0]\n",
|
|
" if provider_as_of_date is None:\n",
|
|
" raise RuntimeError(f'No as_of_date found in {FILES_TABLE} for provider summary load.')\n",
|
|
" provider_file_id, provider_rows, provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count = load_provider_summary(cur, provider_as_of_date)\n",
|
|
" conn.commit()\n",
|
|
"\n",
|
|
"print(f'Provider summary file_id: {provider_file_id}')\n",
|
|
"print(f'Provider summary rows loaded: {provider_rows:,}')\n",
|
|
"print(f'Provider counts: total={provider_count:,}, fiber={fiber_count:,}, cable={cable_count:,}, fixed_wireless={fixed_wireless_count:,}, copper={copper_count:,}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "26",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with get_conn() as conn:\n",
|
|
" provider_qa = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" count(*) as total_rows,\n",
|
|
" count(*) filter (where fcc_provider_count is not null) as rows_with_provider_count,\n",
|
|
" count(*) filter (where fcc_fiber_provider_count is not null) as rows_with_fiber_provider_count,\n",
|
|
" count(*) filter (where fcc_cable_provider_count is not null) as rows_with_cable_provider_count,\n",
|
|
" count(*) filter (where fcc_fixed_wireless_provider_count is not null) as rows_with_fixed_wireless_provider_count,\n",
|
|
" count(*) filter (where fcc_summary_json -> 'provider_summary' is not null) as rows_with_provider_summary_json\n",
|
|
" from {CONNECTION_TABLE}\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" )\n",
|
|
"\n",
|
|
" provider_agg = pd.read_sql(\n",
|
|
" f'''\n",
|
|
" select\n",
|
|
" count(distinct provider_id) as provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fiber') as fiber_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Cable') as cable_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Fixed Wireless') as fixed_wireless_provider_count,\n",
|
|
" count(distinct provider_id) filter (where provider_class = 'Copper') as copper_provider_count\n",
|
|
" from {PROVIDER_SUMMARY_TABLE}\n",
|
|
" where as_of_date = %s\n",
|
|
" ''',\n",
|
|
" conn,\n",
|
|
" params=(provider_as_of_date,),\n",
|
|
" )\n",
|
|
"\n",
|
|
"print('Provider-summary QA:')\n",
|
|
"display(provider_qa)\n",
|
|
"print('Provider aggregate counts:')\n",
|
|
"display(provider_agg)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "27",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Phase 2B: Derive Scalar Broadband Columns from Summary JSON\n",
|
|
"\n",
|
|
"This step derives scalar speed columns from `fcc_summary_json` for easier SQL use.\n",
|
|
"\n",
|
|
"Notes:\n",
|
|
"- `fcc_max_advertised_download_mbps` / `fcc_max_advertised_upload_mbps` are estimated from the highest speed tier with non-zero availability share.\n",
|
|
"- Provider-count columns are populated from the separate provider-summary catalog, which is global catalog context rather than geography-specific broadband coverage."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "28",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Tables Created by This Notebook and Their Relationships\n",
|
|
"\n",
|
|
"### Tables Created / Maintained\n",
|
|
"1. `public.fcc_bdc_as_of`\n",
|
|
"- Release/version metadata by `as_of_date`.\n",
|
|
"\n",
|
|
"2. `public.fcc_bdc_files`\n",
|
|
"- File-level lineage records for each FCC BDC release.\n",
|
|
"\n",
|
|
"3. `public.fcc_bdc_broadband_by_datacenter`\n",
|
|
"- Per-data-center broadband fact table keyed by `(master_id, as_of_date)`.\n",
|
|
"\n",
|
|
"4. `public.fcc_bdc_broadband_summary`\n",
|
|
"- Release-level aggregate summary metrics.\n",
|
|
"\n",
|
|
"5. `public.fcc_bdc_provider_summary`\n",
|
|
"- Release-level provider catalog and provider-class summary metrics.\n",
|
|
"\n",
|
|
"### Key Relationships\n",
|
|
"- `public.fcc_bdc_as_of (as_of_date)`\n",
|
|
" - 1-to-many -> `public.fcc_bdc_files (as_of_date)`\n",
|
|
" - 1-to-many -> `public.fcc_bdc_broadband_by_datacenter (as_of_date)`\n",
|
|
" - 1-to-many -> `public.fcc_bdc_broadband_summary (as_of_date)`\n",
|
|
" - 1-to-many -> `public.fcc_bdc_provider_summary (as_of_date)`\n",
|
|
"\n",
|
|
"- `public.master_data_centers (master_id)`\n",
|
|
" - 1-to-many over time -> `public.fcc_bdc_broadband_by_datacenter (master_id, as_of_date)`\n",
|
|
"\n",
|
|
"### Rerun Notes\n",
|
|
"- The notebook is designed for repeat refreshes as new FCC releases arrive.\n",
|
|
"- Use `as_of_date` as the version key when comparing snapshots over time."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": ".venv",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"name": "python",
|
|
"version": "3.14.5"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|