{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# FCC BDC Broadband Connection Tables for Master Data Centers\n", "\n", "Notebook version of [build_fcc_bdc_broadband_connection_table.py](build_fcc_bdc_broadband_connection_table.py).\n", "\n", "Builds and refreshes:\n", "\n", "1. `public.fcc_bdc_api_as_of_dates` - FCC BDC API as-of date catalog\n", "2. `public.fcc_bdc_availability_files` - FCC fixed-broadband availability file catalog for an as-of date\n", "3. `public.data_center_broadband_connection` - per-data-center broadband connection base table\n", "\n", "If FCC credentials are missing, the notebook still rebuilds the base connection table and leaves FCC status as pending." ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "import os\n", "import subprocess\n", "from datetime import date, datetime\n", "from pathlib import Path\n", "from typing import Any\n", "\n", "import pandas as pd\n", "import psycopg2\n", "import requests\n", "from psycopg2.extras import Json, execute_values\n", "\n", "pd.set_option('display.max_columns', 200)\n", "\n", "print('pandas: ', pd.__version__)\n", "print('psycopg2:', psycopg2.__version__)" ] }, { "cell_type": "code", "execution_count": null, "id": "2", "metadata": {}, "outputs": [], "source": [ "def load_env_file(env_path: str = '.env') -> None:\n", " p = Path(env_path)\n", " if not p.exists():\n", " print(f'No {env_path} file found in {Path.cwd()}')\n", " return\n", "\n", " loaded = 0\n", " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith('#') or '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " key = key.strip()\n", " value = value.strip().strip('\\\"').strip(\"'\")\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", " loaded += 1\n", " print(f'Loaded {loaded} env var(s) from {env_path}')\n", "\n", "\n", "def load_zsh_secrets() -> None:\n", " secrets = Path.home() / '.zsh_secrets'\n", " if not secrets.exists():\n", " return\n", "\n", " result = subprocess.run(\n", " ['zsh', '-lc', 'source ~/.zsh_secrets >/dev/null 2>&1; env'],\n", " check=True,\n", " capture_output=True,\n", " text=True,\n", " )\n", " for line in result.stdout.splitlines():\n", " if '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", "\n", "\n", "def require_env(keys: list[str]) -> None:\n", " missing = [k for k in keys if not os.getenv(k)]\n", " if missing:\n", " raise RuntimeError('Missing required env vars: ' + ', '.join(missing))\n", "\n", "\n", "load_env_file('.env')\n", "load_zsh_secrets()\n", "require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])" ] }, { "cell_type": "code", "execution_count": null, "id": "3", "metadata": {}, "outputs": [], "source": [ "DB_NAME = 'data_centers'\n", "\n", "MASTER_TABLE = 'public.master_data_centers'\n", "TRACT_TABLE = 'public.data_center_census_tracts_2024'\n", "AS_OF_TABLE = 'public.fcc_bdc_api_as_of_dates'\n", "FILES_TABLE = 'public.fcc_bdc_availability_files'\n", "CONNECTION_TABLE = 'public.data_center_broadband_connection'\n", "\n", "FCC_BASE_URL = 'https://broadbandmap.fcc.gov/api/public'\n", "USER_AGENT = 'data-center-fcc-bdc-loader/1.0'\n", "\n", "\n", "def get_conn():\n", " return psycopg2.connect(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " password=os.environ['PGWEB_PASSWORD'],\n", " dbname=DB_NAME,\n", " )\n", "\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select current_database(), current_user')\n", " print('Connected:', cur.fetchone())\n", " cur.execute('create extension if not exists postgis')\n", " for t in (MASTER_TABLE, TRACT_TABLE):\n", " cur.execute('select to_regclass(%s)', (t,))\n", " status = 'OK' if cur.fetchone()[0] is not None else 'MISSING'\n", " print(f'{t}: {status}')" ] }, { "cell_type": "markdown", "id": "4", "metadata": {}, "source": [ "## Parameters" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "# Set True to build only the base connection table and skip FCC API calls.\n", "SKIP_FCC = False\n", "\n", "# Optional override in YYYY-MM-DD format; when None, uses latest from FCC API.\n", "AS_OF_DATE_OVERRIDE = None" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "## Create Tables and Base Utilities" ] }, { "cell_type": "code", "execution_count": null, "id": "7", "metadata": {}, "outputs": [], "source": [ "def fcc_credentials() -> tuple[str | None, str | None]:\n", " username = os.getenv('FCC_USERNAME') or os.getenv('FCC_BDC_USERNAME')\n", " hash_value = os.getenv('FCC_API_KEY') or os.getenv('FCC_HASH_VALUE')\n", " return username, hash_value\n", "\n", "\n", "def fcc_get(path: str, *, params: dict[str, Any] | None = None) -> dict[str, Any]:\n", " username, hash_value = fcc_credentials()\n", " if not username or not hash_value:\n", " raise RuntimeError(\n", " 'FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus FCC_API_KEY or FCC_HASH_VALUE.'\n", " )\n", "\n", " url = f'{FCC_BASE_URL}{path}'\n", " headers = {\n", " 'username': username,\n", " 'hash_value': hash_value,\n", " 'user-agent': USER_AGENT,\n", " 'accept': 'application/json',\n", " }\n", " response = requests.get(url, headers=headers, params=params or {}, timeout=60)\n", " response.raise_for_status()\n", " payload = response.json()\n", " if str(payload.get('status_code')) in {'401', '403'} or payload.get('status') == 'fail':\n", " raise RuntimeError(f'FCC API error for {path}: {payload}')\n", " return payload\n", "\n", "\n", "def parse_date(value: Any) -> date | None:\n", " if value in (None, ''):\n", " return None\n", " if isinstance(value, date):\n", " return value\n", " return datetime.strptime(str(value)[:10], '%Y-%m-%d').date()\n", "\n", "\n", "def to_int(value: Any) -> int | None:\n", " if value in (None, ''):\n", " return None\n", " try:\n", " return int(str(value).replace(',', ''))\n", " except (TypeError, ValueError):\n", " return None\n", "\n", "\n", "def create_tables(cur) -> None:\n", " cur.execute('create extension if not exists postgis')\n", "\n", " cur.execute(\n", " f\"\"\"\n", " create table if not exists {AS_OF_TABLE} (\n", " data_type text not null,\n", " as_of_date date not null,\n", " raw jsonb not null,\n", " fetched_at timestamptz not null default now(),\n", " primary key (data_type, as_of_date)\n", " )\n", " \"\"\"\n", " )\n", "\n", " cur.execute(\n", " f\"\"\"\n", " create table if not exists {FILES_TABLE} (\n", " as_of_date date not null,\n", " file_id bigint not null,\n", " category text,\n", " subcategory text,\n", " technology_type text,\n", " technology_code text,\n", " technology_code_desc text,\n", " speed_tier text,\n", " state_fips text,\n", " state_name text,\n", " provider_id bigint,\n", " provider_name text,\n", " file_type text,\n", " file_name text,\n", " record_count bigint,\n", " raw jsonb not null,\n", " fetched_at timestamptz not null default now(),\n", " primary key (as_of_date, file_id)\n", " )\n", " \"\"\"\n", " )\n", " cur.execute(\n", " f'create index if not exists fcc_bdc_availability_files_category_idx on {FILES_TABLE} (category, subcategory)'\n", " )\n", " cur.execute(\n", " f'create index if not exists fcc_bdc_availability_files_state_idx on {FILES_TABLE} (state_fips)'\n", " )\n", " cur.execute(\n", " f'create index if not exists fcc_bdc_availability_files_provider_idx on {FILES_TABLE} (provider_id)'\n", " )\n", "\n", " cur.execute(\n", " f\"\"\"\n", " create table if not exists {CONNECTION_TABLE} (\n", " master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n", " source text,\n", " name text,\n", " operator text,\n", " city text,\n", " state text,\n", " country text,\n", " longitude double precision,\n", " latitude double precision,\n", " geom geometry(Point, 4326),\n", " census_tract_geoid text,\n", " census_broadband_subscription_pct numeric,\n", " fcc_bdc_status text not null,\n", " fcc_bdc_as_of_date date,\n", " fcc_bdc_geography_type text,\n", " fcc_bdc_geoid text,\n", " fcc_provider_count integer,\n", " fcc_fiber_provider_count integer,\n", " fcc_cable_provider_count integer,\n", " fcc_fixed_wireless_provider_count integer,\n", " fcc_max_advertised_download_mbps numeric,\n", " fcc_max_advertised_upload_mbps numeric,\n", " fcc_100_20_provider_count integer,\n", " fcc_summary_json jsonb,\n", " fetched_at timestamptz not null default now(),\n", " updated_at timestamptz not null default now()\n", " )\n", " \"\"\"\n", " )\n", " cur.execute(\n", " f'create index if not exists data_center_broadband_connection_geom_gix on {CONNECTION_TABLE} using gist (geom)'\n", " )\n", " cur.execute(\n", " f'create index if not exists data_center_broadband_connection_tract_idx on {CONNECTION_TABLE} (census_tract_geoid)'\n", " )\n", " cur.execute(\n", " f'create index if not exists data_center_broadband_connection_status_idx on {CONNECTION_TABLE} (fcc_bdc_status)'\n", " )\n", "\n", "\n", "def rebuild_connection_base(cur, status: str) -> int:\n", " cur.execute(f'truncate {CONNECTION_TABLE}')\n", " cur.execute(\n", " f\"\"\"\n", " insert into {CONNECTION_TABLE} (\n", " master_id, source, name, operator, city, state, country,\n", " longitude, latitude, geom,\n", " census_tract_geoid, census_broadband_subscription_pct,\n", " fcc_bdc_status\n", " )\n", " select\n", " dc.master_id, dc.source, dc.name, dc.operator, dc.city, dc.state, dc.country,\n", " dc.longitude, dc.latitude, dc.geom,\n", " dc.geoid as census_tract_geoid,\n", " tr.broadband_subscription_pct as census_broadband_subscription_pct,\n", " %s as fcc_bdc_status\n", " from {MASTER_TABLE} dc\n", " left join {TRACT_TABLE} tr on tr.geoid::text = dc.geoid::text\n", " \"\"\",\n", " (status,),\n", " )\n", " cur.execute(f'select count(*) from {CONNECTION_TABLE}')\n", " return cur.fetchone()[0]" ] }, { "cell_type": "markdown", "id": "8", "metadata": {}, "source": [ "## FCC Catalog Load Functions" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "def latest_availability_date(rows: list[dict[str, Any]]) -> date | None:\n", " dates = [\n", " parse_date(r.get('as_of_date'))\n", " for r in rows\n", " if str(r.get('data_type', '')).lower() in {'availability', 'availability data'}\n", " ]\n", " dates = [d for d in dates if d is not None]\n", " return max(dates) if dates else None\n", "\n", "\n", "def load_as_of_dates(cur) -> date:\n", " payload = fcc_get('/map/listAsOfDates')\n", " rows = payload.get('data') or []\n", " values = []\n", " for row in rows:\n", " as_of_date = parse_date(row.get('as_of_date'))\n", " if not as_of_date:\n", " continue\n", " values.append((row.get('data_type'), as_of_date, Json(row)))\n", "\n", " if values:\n", " execute_values(\n", " cur,\n", " f\"\"\"\n", " insert into {AS_OF_TABLE} (data_type, as_of_date, raw)\n", " values %s\n", " on conflict (data_type, as_of_date) do update set\n", " raw = excluded.raw,\n", " fetched_at = now()\n", " \"\"\",\n", " values,\n", " page_size=1000,\n", " )\n", "\n", " latest = latest_availability_date(rows)\n", " if latest is None:\n", " raise RuntimeError(f'Could not find an availability as_of_date in FCC response: {rows}')\n", " return latest\n", "\n", "\n", "def load_availability_file_catalog(cur, as_of_date: date) -> int:\n", " payload = fcc_get(\n", " f'/map/downloads/listAvailabilityData/{as_of_date:%Y-%m-%d}',\n", " params={'technology_type': 'Fixed Broadband'},\n", " )\n", " rows = payload.get('data') or []\n", " values = []\n", " for row in rows:\n", " file_id = to_int(row.get('file_id'))\n", " if file_id is None:\n", " continue\n", " values.append(\n", " (\n", " as_of_date,\n", " file_id,\n", " row.get('category'),\n", " row.get('subcategory'),\n", " row.get('technology_type'),\n", " row.get('technology_code'),\n", " row.get('technology_code_desc'),\n", " row.get('speed_tier'),\n", " row.get('state_fips'),\n", " row.get('state_name'),\n", " to_int(row.get('provider_id')),\n", " row.get('provider_name'),\n", " row.get('file_type'),\n", " row.get('file_name'),\n", " to_int(row.get('record_count')),\n", " Json(row),\n", " )\n", " )\n", "\n", " if values:\n", " cur.execute(f'delete from {FILES_TABLE} where as_of_date = %s', (as_of_date,))\n", " execute_values(\n", " cur,\n", " f\"\"\"\n", " insert into {FILES_TABLE} (\n", " as_of_date, file_id, category, subcategory, technology_type,\n", " technology_code, technology_code_desc, speed_tier, state_fips,\n", " state_name, provider_id, provider_name, file_type, file_name,\n", " record_count, raw\n", " )\n", " values %s\n", " \"\"\",\n", " values,\n", " page_size=1000,\n", " )\n", " return len(values)" ] }, { "cell_type": "markdown", "id": "10", "metadata": {}, "source": [ "## Run Load Pipeline\n", "\n", "This cell mirrors the script's `main()` behavior." ] }, { "cell_type": "code", "execution_count": null, "id": "11", "metadata": {}, "outputs": [], "source": [ "username, hash_value = fcc_credentials()\n", "status = 'pending_fcc_username' if hash_value and not username else 'pending_fcc_catalog'\n", "if SKIP_FCC:\n", " status = 'fcc_skipped'\n", "\n", "as_of_date = None\n", "n_files = 0\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " create_tables(cur)\n", " n_connection = rebuild_connection_base(cur, status)\n", " print(f'{CONNECTION_TABLE}: {n_connection:,} base rows')\n", "\n", " if SKIP_FCC:\n", " conn.commit()\n", " print('FCC load skipped (SKIP_FCC=True).')\n", " elif not username or not hash_value:\n", " conn.commit()\n", " print('FCC catalog not loaded: set FCC_USERNAME/FCC_BDC_USERNAME and FCC_API_KEY/FCC_HASH_VALUE.')\n", " else:\n", " as_of_date = parse_date(AS_OF_DATE_OVERRIDE) if AS_OF_DATE_OVERRIDE else load_as_of_dates(cur)\n", " n_files = load_availability_file_catalog(cur, as_of_date)\n", "\n", " cur.execute(\n", " f\"\"\"\n", " update {CONNECTION_TABLE}\n", " set fcc_bdc_status = 'fcc_catalog_loaded',\n", " fcc_bdc_as_of_date = %s,\n", " updated_at = now()\n", " \"\"\",\n", " (as_of_date,),\n", " )\n", " conn.commit()\n", "\n", "if as_of_date is not None:\n", " print(f'{AS_OF_TABLE}: loaded latest availability date {as_of_date}')\n", " print(f'{FILES_TABLE}: {n_files:,} fixed-broadband file catalog rows')" ] }, { "cell_type": "markdown", "id": "12", "metadata": {}, "source": [ "## Quick QA" ] }, { "cell_type": "code", "execution_count": null, "id": "13", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " q1 = f\"\"\"\n", " select fcc_bdc_status, count(*) as n\n", " from {CONNECTION_TABLE}\n", " group by 1\n", " order by 2 desc\n", " \"\"\"\n", " display(pd.read_sql(q1, conn))\n", "\n", " q2 = f\"\"\"\n", " select as_of_date, count(*) as file_rows\n", " from {FILES_TABLE}\n", " group by 1\n", " order by as_of_date desc\n", " limit 10\n", " \"\"\"\n", " display(pd.read_sql(q2, conn))\n", "\n", " q3 = f\"\"\"\n", " select master_id, name, state, fcc_bdc_status, fcc_bdc_as_of_date\n", " from {CONNECTION_TABLE}\n", " order by master_id\n", " limit 20\n", " \"\"\"\n", " display(pd.read_sql(q3, conn))" ] }, { "cell_type": "markdown", "id": "14", "metadata": {}, "source": [ "## Tables Created and Relationships\n", "\n", "### Tables Created\n", "\n", "1. **`public.fcc_bdc_api_as_of_dates`**\n", " - Purpose: Stores FCC API-reported as-of dates by data type.\n", " - Grain: one row per `(data_type, as_of_date)`.\n", " - Key columns:\n", " - `data_type`\n", " - `as_of_date`\n", " - `raw` (full FCC response payload for that row)\n", "\n", "2. **`public.fcc_bdc_availability_files`**\n", " - Purpose: Stores FCC fixed-broadband availability file catalog entries for a specific as-of date.\n", " - Grain: one row per `(as_of_date, file_id)`.\n", " - Key columns:\n", " - `as_of_date`\n", " - `file_id`\n", " - `category`, `subcategory`, `technology_type`, `technology_code_desc`\n", " - `state_fips`, `state_name`\n", " - `provider_id`, `provider_name`\n", " - `file_name`, `record_count`, `raw`\n", "\n", "3. **`public.fcc_bdc_provider_summary`**\n", " - Purpose: Stores provider-summary rows from the FCC provider-level download.\n", " - Grain: one row per `(as_of_date, file_id, provider_id, technology_code, technology_code_desc)`.\n", " - Key columns:\n", " - `provider_id`, `holding_company`\n", " - `technology_code`, `technology_code_desc`, `provider_class`\n", " - `location_count_res`, `unit_count_res`, `location_count_bus`, `unit_count_bus`\n", "\n", "4. **`public.fcc_bdc_summary_geography`**\n", " - Purpose: Stores FCC summary-by-geography rows for states and counties.\n", " - Grain: one row per `(as_of_date, file_id, geography_type, geography_id, biz_res, technology)`.\n", " - Key columns:\n", " - `geography_type`, `geography_id`, `geography_desc_full`\n", " - `technology`, `biz_res`\n", " - `total_units`, `speed_02_02`, `speed_10_1`, `speed_25_3`, `speed_100_20`, `speed_250_25`, `speed_1000_100`\n", "\n", "5. **`public.data_center_broadband_connection`**\n", " - Purpose: One-row-per-data-center connection profile and FCC load status.\n", " - Grain: one row per `master_id`.\n", " - Key columns:\n", " - `master_id` (PK)\n", " - core DC attributes copied from `public.master_data_centers`\n", " - `census_tract_geoid`, `census_broadband_subscription_pct`\n", " - FCC status/tracking fields: `fcc_bdc_status`, `fcc_bdc_as_of_date`\n", " - scalar summary fields for download/upload speed, provider counts, and `fcc_summary_json`\n", "\n", "### Relationships\n", "\n", "- `public.master_data_centers` -> `public.data_center_broadband_connection`\n", " - Relationship: **1:1 by `master_id`**\n", " - Enforced by foreign key on `data_center_broadband_connection.master_id`.\n", "\n", "- `public.data_center_census_tracts_2024` -> `public.data_center_broadband_connection`\n", " - Relationship: **many:1 via tract GEOID** during base rebuild.\n", " - Join used in notebook: `data_center_census_tracts_2024.geoid::text = master_data_centers.geoid::text`.\n", "\n", "- `public.fcc_bdc_api_as_of_dates` -> `public.fcc_bdc_availability_files`\n", " - Relationship: **1:many by `as_of_date`** (logical relationship).\n", " - Not enforced with an explicit FK, but both tables are connected by matching `as_of_date`.\n", "\n", "- `public.fcc_bdc_availability_files` -> `public.data_center_broadband_connection`\n", " - Current relationship: **status/date attribution plus summary-file discovery**.\n", " - The notebook uses the availability catalog to find the summary and provider downloads for the current as-of date.\n", "\n", "- `public.fcc_bdc_summary_geography` -> `public.data_center_broadband_connection`\n", " - Relationship: **many:1 via county/state GEOID fallback**.\n", " - County rows are matched on the first 5 digits of `census_tract_geoid`; state rows are used as a fallback.\n", "\n", "- `public.fcc_bdc_provider_summary` -> `public.data_center_broadband_connection`\n", " - Relationship: **global aggregate context**.\n", " - Provider-count columns are filled from provider-summary aggregates because the provider file is not geography-specific.\n", "\n", "### Load Behavior Summary\n", "\n", "- Base rebuild always refreshes `public.data_center_broadband_connection` from master DC + tract context.\n", "- If FCC credentials are available and `SKIP_FCC=False`:\n", " - latest (or overridden) as-of dates are loaded into `public.fcc_bdc_api_as_of_dates`\n", " - file catalog rows for that as-of date are loaded into `public.fcc_bdc_availability_files`\n", " - summary-by-geography rows are staged into `public.fcc_bdc_summary_geography`\n", " - provider-summary rows are staged into `public.fcc_bdc_provider_summary`\n", " - `public.data_center_broadband_connection` is updated with FCC summary linkage and derived scalar fields" ] }, { "cell_type": "markdown", "id": "15", "metadata": {}, "source": [ "## Inspect FCC API Download Content\n", "\n", "This section checks what the FCC API actually returns in downloadable availability summary files, so we can map real fields to the currently-null `data_center_broadband_connection` columns." ] }, { "cell_type": "code", "execution_count": null, "id": "16", "metadata": {}, "outputs": [], "source": [ "import io\n", "import zipfile\n", "\n", "\n", "def download_availability_file(file_id: int, *, file_type: int | None = None) -> tuple[str, pd.DataFrame]:\n", " path = f'/map/downloads/downloadFile/availability/{int(file_id)}'\n", " if file_type is not None:\n", " path = f'{path}/{int(file_type)}'\n", "\n", " payload = fcc_get(path) if False else None # keep linter quiet; endpoint returns binary, not JSON\n", "\n", " username, hash_value = fcc_credentials()\n", " if not username or not hash_value:\n", " raise RuntimeError('FCC credentials are required to download files.')\n", "\n", " headers = {\n", " 'username': username,\n", " 'hash_value': hash_value,\n", " 'user-agent': USER_AGENT,\n", " 'accept': '*/*',\n", " }\n", " response = requests.get(f'{FCC_BASE_URL}{path}', headers=headers, timeout=120)\n", " response.raise_for_status()\n", "\n", " content_type = (response.headers.get('content-type') or '').lower()\n", " if 'application/zip' not in content_type and not response.content.startswith(b'PK'):\n", " raise RuntimeError(f'Expected a ZIP payload, got content-type={content_type}')\n", "\n", " with zipfile.ZipFile(io.BytesIO(response.content)) as zf:\n", " csv_members = [n for n in zf.namelist() if n.lower().endswith('.csv')]\n", " if not csv_members:\n", " raise RuntimeError(f'ZIP has no CSV members: {zf.namelist()}')\n", " csv_name = csv_members[0]\n", " with zf.open(csv_name) as f:\n", " df = pd.read_csv(f, low_memory=False)\n", " return csv_name, df\n", "\n", "\n", "with get_conn() as conn:\n", " summary_files = pd.read_sql(\n", " f'''\n", " select as_of_date, file_id, category, subcategory, technology_type, file_type, file_name, record_count\n", " from {FILES_TABLE}\n", " where category = 'Summary'\n", " order by as_of_date desc,\n", " case subcategory when 'Summary by Geography Type - Other Geographies' then 0\n", " when 'Summary by Geography Type - Census Place' then 1\n", " when 'Provider Summary' then 2\n", " else 3 end,\n", " file_id\n", " ''',\n", " conn,\n", " )\n", "\n", "display(summary_files.head(25))\n", "print(f'Summary files available: {len(summary_files):,}')" ] }, { "cell_type": "code", "execution_count": null, "id": "17", "metadata": {}, "outputs": [], "source": [ "if summary_files.empty:\n", " raise RuntimeError('No Summary files found in FCC catalog table.')\n", "\n", "inspect_rows = []\n", "for _, r in summary_files.head(3).iterrows():\n", " csv_name, df_inspect = download_availability_file(int(r['file_id']))\n", " cols = list(df_inspect.columns)\n", " inspect_rows.append({\n", " 'file_id': int(r['file_id']),\n", " 'subcategory': r['subcategory'],\n", " 'csv_name': csv_name,\n", " 'rows': len(df_inspect),\n", " 'cols': len(cols),\n", " 'sample_columns': ', '.join(cols[:12]),\n", " })\n", "\n", "inspect_df = pd.DataFrame(inspect_rows)\n", "display(inspect_df)\n", "\n", "target_tokens = [\n", " 'provider', 'technology', 'speed', 'download', 'upload', 'geography', 'geoid', 'state', 'county', 'place'\n", "]\n", "\n", "first_file_id = int(summary_files.iloc[0]['file_id'])\n", "first_csv, first_df = download_availability_file(first_file_id)\n", "matching_cols = [c for c in first_df.columns if any(t in c.lower() for t in target_tokens)]\n", "\n", "print(f'Inspected file_id={first_file_id}, csv={first_csv}, rows={len(first_df):,}, cols={len(first_df.columns):,}')\n", "print('Columns that may map to broadband summary fields:')\n", "for c in matching_cols:\n", " print(' -', c)\n", "\n", "display(first_df.head(10))" ] }, { "cell_type": "code", "execution_count": null, "id": "18", "metadata": {}, "outputs": [], "source": [ "SUMMARY_TABLE = 'public.fcc_bdc_summary_geography'\n", "\n", "\n", "def to_float(value):\n", " if value in (None, '', 'NA', 'N/A'):\n", " return None\n", " try:\n", " return float(str(value).replace(',', '').strip())\n", " except (TypeError, ValueError):\n", " return None\n", "\n", "\n", "def normalize_geography_id(geography_type: str, geography_id: str) -> str:\n", " gtype = (geography_type or '').strip()\n", " gid = (geography_id or '').strip()\n", " if gtype == 'State':\n", " return gid.zfill(2)\n", " if gtype == 'County':\n", " return gid.zfill(5)\n", " return gid\n", "\n", "\n", "def create_summary_table(cur) -> None:\n", " cur.execute(\n", " f\"\"\"\n", " create table if not exists {SUMMARY_TABLE} (\n", " as_of_date date not null,\n", " file_id bigint not null,\n", " geography_type text not null,\n", " geography_id text not null,\n", " geography_desc text,\n", " geography_desc_full text,\n", " area_data_type text,\n", " biz_res text not null default '',\n", " technology text not null default '',\n", " total_units numeric,\n", " speed_02_02 numeric,\n", " speed_10_1 numeric,\n", " speed_25_3 numeric,\n", " speed_100_20 numeric,\n", " speed_250_25 numeric,\n", " speed_1000_100 numeric,\n", " raw jsonb not null,\n", " fetched_at timestamptz not null default now(),\n", " primary key (as_of_date, file_id, geography_type, geography_id, biz_res, technology)\n", " )\n", " \"\"\"\n", " )\n", " cur.execute(\n", " f'create index if not exists fcc_bdc_summary_geography_lookup_idx on {SUMMARY_TABLE} (as_of_date, geography_type, geography_id, technology)'\n", " )\n", "\n", "\n", "def load_summary_geography(cur, as_of_date: date, *, max_files: int | None = None) -> tuple[int, int]:\n", " cur.execute(\n", " f\"\"\"\n", " select file_id, subcategory\n", " from {FILES_TABLE}\n", " where as_of_date = %s\n", " and category = 'Summary'\n", " and subcategory in (\n", " 'Summary by Geography Type - Other Geographies',\n", " 'Summary by Geography Type - Census Place'\n", " )\n", " order by case subcategory\n", " when 'Summary by Geography Type - Other Geographies' then 0\n", " when 'Summary by Geography Type - Census Place' then 1\n", " else 2\n", " end,\n", " file_id\n", " \"\"\",\n", " (as_of_date,),\n", " )\n", " files = cur.fetchall()\n", " if max_files is not None:\n", " files = files[:max_files]\n", "\n", " if not files:\n", " return 0, 0\n", "\n", " cur.execute(f'delete from {SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n", "\n", " file_count = 0\n", " row_count = 0\n", " for file_id, subcategory in files:\n", " csv_name, df = download_availability_file(int(file_id))\n", " if df.empty:\n", " continue\n", "\n", " keep = df[df['geography_type'].isin(['State', 'County'])].copy()\n", " if keep.empty:\n", " print(f'file_id={file_id} ({subcategory}): no State/County rows found in {csv_name}')\n", " continue\n", "\n", " values = []\n", " for row in keep.to_dict('records'):\n", " geography_type = str(row.get('geography_type') or '').strip()\n", " geography_id = normalize_geography_id(geography_type, str(row.get('geography_id') or ''))\n", " if not geography_type or not geography_id:\n", " continue\n", "\n", " values.append(\n", " (\n", " as_of_date,\n", " int(file_id),\n", " geography_type,\n", " geography_id,\n", " row.get('geography_desc'),\n", " row.get('geography_desc_full'),\n", " row.get('area_data_type'),\n", " str(row.get('biz_res') or ''),\n", " str(row.get('technology') or ''),\n", " to_float(row.get('total_units')),\n", " to_float(row.get('speed_02_02')),\n", " to_float(row.get('speed_10_1')),\n", " to_float(row.get('speed_25_3')),\n", " to_float(row.get('speed_100_20')),\n", " to_float(row.get('speed_250_25')),\n", " to_float(row.get('speed_1000_100')),\n", " Json(row),\n", " )\n", " )\n", "\n", " if values:\n", " execute_values(\n", " cur,\n", " f\"\"\"\n", " insert into {SUMMARY_TABLE} (\n", " as_of_date, file_id, geography_type, geography_id,\n", " geography_desc, geography_desc_full,\n", " area_data_type, biz_res, technology, total_units,\n", " speed_02_02, speed_10_1, speed_25_3,\n", " speed_100_20, speed_250_25, speed_1000_100,\n", " raw\n", " )\n", " values %s\n", " on conflict (as_of_date, file_id, geography_type, geography_id, biz_res, technology)\n", " do update set\n", " geography_desc = excluded.geography_desc,\n", " geography_desc_full = excluded.geography_desc_full,\n", " area_data_type = excluded.area_data_type,\n", " total_units = excluded.total_units,\n", " speed_02_02 = excluded.speed_02_02,\n", " speed_10_1 = excluded.speed_10_1,\n", " speed_25_3 = excluded.speed_25_3,\n", " speed_100_20 = excluded.speed_100_20,\n", " speed_250_25 = excluded.speed_250_25,\n", " speed_1000_100 = excluded.speed_1000_100,\n", " raw = excluded.raw,\n", " fetched_at = now()\n", " \"\"\",\n", " values,\n", " page_size=1000,\n", " )\n", " row_count += len(values)\n", " file_count += 1\n", " print(f'loaded file_id={file_id} ({subcategory}) from {csv_name}: {len(values):,} state/county rows')\n", "\n", " return file_count, row_count" ] }, { "cell_type": "markdown", "id": "19", "metadata": {}, "source": [ "## Phase 2: Ingest Summary Geography Metrics into Connection Table\n", "\n", "This phase downloads FCC Summary files, stages county/state metrics in PostGIS, and joins the best available geography level back to `public.data_center_broadband_connection`." ] }, { "cell_type": "code", "execution_count": null, "id": "20", "metadata": {}, "outputs": [], "source": [ "def connection_null_snapshot(conn) -> pd.DataFrame:\n", " return pd.read_sql(\n", " f'''\n", " select\n", " count(*) as total_rows,\n", " count(*) filter (where fcc_bdc_geography_type is null) as null_geography_type,\n", " count(*) filter (where fcc_bdc_geoid is null) as null_geography_id,\n", " count(*) filter (where fcc_summary_json is null) as null_summary_json\n", " from {CONNECTION_TABLE}\n", " ''',\n", " conn,\n", " )\n", "\n", "\n", "with get_conn() as conn:\n", " before = connection_null_snapshot(conn)\n", " with conn.cursor() as cur:\n", " create_summary_table(cur)\n", "\n", " cur.execute(f'select max(as_of_date) from {FILES_TABLE}')\n", " as_of_date = cur.fetchone()[0]\n", " if as_of_date is None:\n", " raise RuntimeError(f'No as_of_date found in {FILES_TABLE}. Run FCC catalog load first.')\n", "\n", " loaded_files, loaded_rows = load_summary_geography(cur, as_of_date)\n", " if loaded_files == 0:\n", " raise RuntimeError('No summary files were loaded into staging table.')\n", "\n", " cur.execute(\n", " f'''\n", " with county_ranked as (\n", " select\n", " geography_id,\n", " file_id,\n", " technology,\n", " biz_res,\n", " speed_02_02,\n", " speed_10_1,\n", " speed_25_3,\n", " speed_100_20,\n", " speed_250_25,\n", " speed_1000_100,\n", " row_number() over (\n", " partition by geography_id\n", " order by\n", " case when technology = 'Any Technology' then 0 else 1 end,\n", " case when biz_res in ('All Locations', 'Total', '') then 0 else 1 end,\n", " speed_100_20 desc nulls last,\n", " file_id desc\n", " ) as rn\n", " from {SUMMARY_TABLE}\n", " where as_of_date = %s\n", " and geography_type = 'County'\n", " ),\n", " county_best as (\n", " select * from county_ranked where rn = 1\n", " ),\n", " state_ranked as (\n", " select\n", " geography_id,\n", " file_id,\n", " technology,\n", " biz_res,\n", " speed_02_02,\n", " speed_10_1,\n", " speed_25_3,\n", " speed_100_20,\n", " speed_250_25,\n", " speed_1000_100,\n", " row_number() over (\n", " partition by geography_id\n", " order by\n", " case when technology = 'Any Technology' then 0 else 1 end,\n", " case when biz_res in ('All Locations', 'Total', '') then 0 else 1 end,\n", " speed_100_20 desc nulls last,\n", " file_id desc\n", " ) as rn\n", " from {SUMMARY_TABLE}\n", " where as_of_date = %s\n", " and geography_type = 'State'\n", " ),\n", " state_best as (\n", " select * from state_ranked where rn = 1\n", " ),\n", " matched as (\n", " select\n", " c.master_id,\n", " coalesce(cb.geography_id, sb.geography_id) as geography_id,\n", " case when cb.geography_id is not null then 'County' else 'State' end as geography_level,\n", " coalesce(cb.file_id, sb.file_id) as file_id,\n", " coalesce(cb.technology, sb.technology) as technology,\n", " coalesce(cb.biz_res, sb.biz_res) as biz_res,\n", " coalesce(cb.speed_02_02, sb.speed_02_02) as speed_02_02,\n", " coalesce(cb.speed_10_1, sb.speed_10_1) as speed_10_1,\n", " coalesce(cb.speed_25_3, sb.speed_25_3) as speed_25_3,\n", " coalesce(cb.speed_100_20, sb.speed_100_20) as speed_100_20,\n", " coalesce(cb.speed_250_25, sb.speed_250_25) as speed_250_25,\n", " coalesce(cb.speed_1000_100, sb.speed_1000_100) as speed_1000_100\n", " from {CONNECTION_TABLE} c\n", " left join county_best cb\n", " on cb.geography_id = left(c.census_tract_geoid, 5)\n", " left join state_best sb\n", " on sb.geography_id = left(c.census_tract_geoid, 2)\n", " where cb.geography_id is not null or sb.geography_id is not null\n", " )\n", " update {CONNECTION_TABLE} c\n", " set\n", " fcc_bdc_as_of_date = %s,\n", " fcc_bdc_geography_type = m.geography_level,\n", " fcc_bdc_geoid = m.geography_id,\n", " fcc_summary_json = jsonb_build_object(\n", " 'source', 'fcc_summary_download',\n", " 'as_of_date', %s::text,\n", " 'file_id', m.file_id,\n", " 'join_level', m.geography_level,\n", " 'technology', m.technology,\n", " 'biz_res', m.biz_res,\n", " 'speed_02_02', m.speed_02_02,\n", " 'speed_10_1', m.speed_10_1,\n", " 'speed_25_3', m.speed_25_3,\n", " 'speed_100_20', m.speed_100_20,\n", " 'speed_250_25', m.speed_250_25,\n", " 'speed_1000_100', m.speed_1000_100\n", " ),\n", " fcc_bdc_status = 'fcc_summary_joined',\n", " updated_at = now()\n", " from matched m\n", " where c.master_id = m.master_id\n", " ''',\n", " (as_of_date, as_of_date, as_of_date, as_of_date),\n", " )\n", " updated_rows = cur.rowcount\n", "\n", " after = connection_null_snapshot(conn)\n", "\n", "print(f'Using FCC as_of_date: {as_of_date}')\n", "print(f'Staging loaded files: {loaded_files:,} | rows: {loaded_rows:,}')\n", "print(f'Updated connection rows: {updated_rows:,}')\n", "print('\\nNull snapshot before:')\n", "display(before)\n", "print('Null snapshot after:')\n", "display(after)" ] }, { "cell_type": "code", "execution_count": null, "id": "21", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " join_counts = pd.read_sql(\n", " f'''\n", " select\n", " coalesce(fcc_bdc_geography_type, 'Unmatched') as join_level,\n", " count(*) as data_center_count,\n", " round(avg((fcc_summary_json ->> 'speed_100_20')::numeric), 6) as avg_speed_100_20,\n", " round(avg((fcc_summary_json ->> 'speed_1000_100')::numeric), 6) as avg_speed_1000_100\n", " from {CONNECTION_TABLE}\n", " group by 1\n", " order by case coalesce(fcc_bdc_geography_type, 'Unmatched')\n", " when 'County' then 0\n", " when 'State' then 1\n", " else 2\n", " end\n", " ''',\n", " conn,\n", " )\n", "\n", " sample_joined = pd.read_sql(\n", " f'''\n", " select\n", " master_id,\n", " name,\n", " state,\n", " census_tract_geoid,\n", " fcc_bdc_geography_type,\n", " fcc_bdc_geoid,\n", " fcc_summary_json ->> 'technology' as fcc_technology,\n", " fcc_summary_json ->> 'speed_100_20' as fcc_speed_100_20,\n", " fcc_summary_json ->> 'speed_1000_100' as fcc_speed_1000_100\n", " from {CONNECTION_TABLE}\n", " where fcc_summary_json is not null\n", " order by updated_at desc, master_id\n", " limit 25\n", " ''',\n", " conn,\n", " )\n", "\n", "print('Join coverage summary:')\n", "display(join_counts)\n", "print('Sample joined rows:')\n", "display(sample_joined)" ] }, { "cell_type": "code", "execution_count": null, "id": "22", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " derived_qa = pd.read_sql(\n", " f'''\n", " select\n", " count(*) as total_rows,\n", " count(*) filter (where fcc_summary_json is not null) as rows_with_summary_json,\n", " count(*) filter (where fcc_max_advertised_download_mbps is not null) as rows_with_max_download,\n", " count(*) filter (where fcc_max_advertised_upload_mbps is not null) as rows_with_max_upload,\n", " count(*) filter (where fcc_provider_count is not null) as rows_with_provider_count,\n", " count(*) filter (where fcc_100_20_provider_count is not null) as rows_with_100_20_provider_count\n", " from {CONNECTION_TABLE}\n", " ''',\n", " conn,\n", " )\n", "\n", " tier_dist = pd.read_sql(\n", " f'''\n", " select\n", " fcc_max_advertised_download_mbps,\n", " fcc_max_advertised_upload_mbps,\n", " count(*) as row_count\n", " from {CONNECTION_TABLE}\n", " where fcc_summary_json is not null\n", " group by 1, 2\n", " order by 1 desc nulls last, 2 desc nulls last\n", " ''',\n", " conn,\n", " )\n", "\n", "print('Derived scalar QA:')\n", "display(derived_qa)\n", "print('Derived tier distribution:')\n", "display(tier_dist.head(20))" ] }, { "cell_type": "code", "execution_count": null, "id": "23", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(\n", " f'''\n", " update {CONNECTION_TABLE}\n", " set\n", " fcc_max_advertised_download_mbps = case\n", " when coalesce((fcc_summary_json ->> 'speed_1000_100')::numeric, 0) > 0 then 1000\n", " when coalesce((fcc_summary_json ->> 'speed_250_25')::numeric, 0) > 0 then 250\n", " when coalesce((fcc_summary_json ->> 'speed_100_20')::numeric, 0) > 0 then 100\n", " when coalesce((fcc_summary_json ->> 'speed_25_3')::numeric, 0) > 0 then 25\n", " when coalesce((fcc_summary_json ->> 'speed_10_1')::numeric, 0) > 0 then 10\n", " when coalesce((fcc_summary_json ->> 'speed_02_02')::numeric, 0) > 0 then 2\n", " else null\n", " end,\n", " fcc_max_advertised_upload_mbps = case\n", " when coalesce((fcc_summary_json ->> 'speed_1000_100')::numeric, 0) > 0 then 100\n", " when coalesce((fcc_summary_json ->> 'speed_250_25')::numeric, 0) > 0 then 25\n", " when coalesce((fcc_summary_json ->> 'speed_100_20')::numeric, 0) > 0 then 20\n", " when coalesce((fcc_summary_json ->> 'speed_25_3')::numeric, 0) > 0 then 3\n", " when coalesce((fcc_summary_json ->> 'speed_10_1')::numeric, 0) > 0 then 1\n", " when coalesce((fcc_summary_json ->> 'speed_02_02')::numeric, 0) > 0 then 0.2\n", " else null\n", " end,\n", " fcc_bdc_status = case\n", " when fcc_bdc_status = 'fcc_summary_joined' then 'fcc_summary_joined_derived'\n", " else fcc_bdc_status\n", " end,\n", " updated_at = now()\n", " where fcc_summary_json is not null\n", " and fcc_summary_json ->> 'source' = 'fcc_summary_download'\n", " '''\n", " )\n", " derived_rows = cur.rowcount\n", "\n", "print(f'Derived scalar columns for rows: {derived_rows:,}')" ] }, { "cell_type": "markdown", "id": "24", "metadata": {}, "source": [ "## Phase 2C: Ingest Provider Summary Catalog\n", "\n", "The FCC provider-summary download is provider-level rather than geography-level, so this step stages it separately and uses global provider aggregates to populate the provider count columns." ] }, { "cell_type": "code", "execution_count": null, "id": "25", "metadata": {}, "outputs": [], "source": [ "PROVIDER_SUMMARY_TABLE = 'public.fcc_bdc_provider_summary'\n", "\n", "\n", "def classify_provider_technology(technology_code_desc: str | None) -> str:\n", " text = (technology_code_desc or '').strip().lower()\n", " if not text:\n", " return 'Other'\n", " if 'fiber' in text:\n", " return 'Fiber'\n", " if 'cable' in text:\n", " return 'Cable'\n", " if 'unlicensed fixed wireless' in text or 'licensed fixed wireless' in text or 'fixed wireless' in text:\n", " return 'Fixed Wireless'\n", " if 'copper' in text:\n", " return 'Copper'\n", " if 'satellite' in text:\n", " return 'Satellite'\n", " return 'Other'\n", "\n", "\n", "def create_provider_summary_table(cur) -> None:\n", " cur.execute(\n", " f\"\"\"\n", " create table if not exists {PROVIDER_SUMMARY_TABLE} (\n", " as_of_date date not null,\n", " file_id bigint not null,\n", " provider_id bigint not null,\n", " holding_company text,\n", " technology_code text,\n", " technology_code_desc text,\n", " provider_class text,\n", " location_count_res bigint,\n", " unit_count_res bigint,\n", " location_count_bus bigint,\n", " unit_count_bus bigint,\n", " raw jsonb not null,\n", " fetched_at timestamptz not null default now(),\n", " primary key (as_of_date, file_id, provider_id, technology_code, technology_code_desc)\n", " )\n", " \"\"\"\n", " )\n", "\n", "\n", "def load_provider_summary(cur, as_of_date: date) -> tuple[int, int, int, int, int, int, int]:\n", " cur.execute(\n", " f\"\"\"\n", " select file_id\n", " from {FILES_TABLE}\n", " where as_of_date = %s\n", " and category = 'Summary'\n", " and subcategory = 'Provider Summary'\n", " order by file_id\n", " limit 1\n", " \"\"\",\n", " (as_of_date,),\n", " )\n", " row = cur.fetchone()\n", " if not row:\n", " return 0, 0, 0, 0, 0, 0, 0\n", "\n", " file_id = int(row[0])\n", " csv_name, df = download_availability_file(file_id)\n", " if df.empty:\n", " return file_id, 0, 0, 0, 0, 0, 0\n", "\n", " cur.execute(f'delete from {PROVIDER_SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n", " values = []\n", " for row in df.to_dict('records'):\n", " provider_id = to_int(row.get('provider_id'))\n", " if provider_id is None:\n", " continue\n", " values.append((\n", " as_of_date,\n", " file_id,\n", " provider_id,\n", " row.get('holding_company'),\n", " row.get('technology_code'),\n", " row.get('technology_code_desc'),\n", " classify_provider_technology(row.get('technology_code_desc')),\n", " to_int(row.get('location_count_res')),\n", " to_int(row.get('unit_count_res')),\n", " to_int(row.get('location_count_bus')),\n", " to_int(row.get('unit_count_bus')),\n", " Json(row),\n", " ))\n", "\n", " if values:\n", " execute_values(\n", " cur,\n", " f\"\"\"\n", " insert into {PROVIDER_SUMMARY_TABLE} (\n", " as_of_date, file_id, provider_id, holding_company,\n", " technology_code, technology_code_desc, provider_class,\n", " location_count_res, unit_count_res, location_count_bus, unit_count_bus, raw\n", " )\n", " values %s\n", " on conflict (as_of_date, file_id, provider_id, technology_code, technology_code_desc)\n", " do update set\n", " holding_company = excluded.holding_company,\n", " provider_class = excluded.provider_class,\n", " location_count_res = excluded.location_count_res,\n", " unit_count_res = excluded.unit_count_res,\n", " location_count_bus = excluded.location_count_bus,\n", " unit_count_bus = excluded.unit_count_bus,\n", " raw = excluded.raw,\n", " fetched_at = now()\n", " \"\"\",\n", " values,\n", " page_size=1000,\n", " )\n", "\n", " cur.execute(\n", " f\"\"\"\n", " with provider_stats as (\n", " select\n", " count(distinct provider_id) as provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Fiber') as fiber_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Cable') as cable_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Fixed Wireless') as fixed_wireless_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Copper') as copper_provider_count\n", " from {PROVIDER_SUMMARY_TABLE}\n", " where as_of_date = %s\n", " )\n", " update {CONNECTION_TABLE} c\n", " set\n", " fcc_provider_count = s.provider_count,\n", " fcc_fiber_provider_count = s.fiber_provider_count,\n", " fcc_cable_provider_count = s.cable_provider_count,\n", " fcc_fixed_wireless_provider_count = s.fixed_wireless_provider_count,\n", " fcc_summary_json = jsonb_set(\n", " coalesce(c.fcc_summary_json, '{{}}'::jsonb),\n", " '{{provider_summary}}',\n", " jsonb_build_object(\n", " 'file_id', %s,\n", " 'provider_count', s.provider_count,\n", " 'fiber_provider_count', s.fiber_provider_count,\n", " 'cable_provider_count', s.cable_provider_count,\n", " 'fixed_wireless_provider_count', s.fixed_wireless_provider_count,\n", " 'copper_provider_count', s.copper_provider_count\n", " ),\n", " true\n", " ),\n", " updated_at = now()\n", " from provider_stats s\n", " where c.fcc_summary_json is not null\n", " \"\"\",\n", " (as_of_date, file_id),\n", " )\n", "\n", " cur.execute(f'select count(*) from {PROVIDER_SUMMARY_TABLE} where as_of_date = %s', (as_of_date,))\n", " provider_rows = cur.fetchone()[0]\n", "\n", " cur.execute(\n", " f\"\"\"\n", " select\n", " count(distinct provider_id),\n", " count(distinct provider_id) filter (where provider_class = 'Fiber'),\n", " count(distinct provider_id) filter (where provider_class = 'Cable'),\n", " count(distinct provider_id) filter (where provider_class = 'Fixed Wireless'),\n", " count(distinct provider_id) filter (where provider_class = 'Copper')\n", " from {PROVIDER_SUMMARY_TABLE}\n", " where as_of_date = %s\n", " \"\"\",\n", " (as_of_date,),\n", " )\n", " provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count = cur.fetchone()\n", " return file_id, provider_rows, provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count\n", "\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " create_provider_summary_table(cur)\n", " cur.execute(f'select max(as_of_date) from {FILES_TABLE}')\n", " provider_as_of_date = cur.fetchone()[0]\n", " if provider_as_of_date is None:\n", " raise RuntimeError(f'No as_of_date found in {FILES_TABLE} for provider summary load.')\n", " provider_file_id, provider_rows, provider_count, fiber_count, cable_count, fixed_wireless_count, copper_count = load_provider_summary(cur, provider_as_of_date)\n", " conn.commit()\n", "\n", "print(f'Provider summary file_id: {provider_file_id}')\n", "print(f'Provider summary rows loaded: {provider_rows:,}')\n", "print(f'Provider counts: total={provider_count:,}, fiber={fiber_count:,}, cable={cable_count:,}, fixed_wireless={fixed_wireless_count:,}, copper={copper_count:,}')" ] }, { "cell_type": "code", "execution_count": null, "id": "26", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " provider_qa = pd.read_sql(\n", " f'''\n", " select\n", " count(*) as total_rows,\n", " count(*) filter (where fcc_provider_count is not null) as rows_with_provider_count,\n", " count(*) filter (where fcc_fiber_provider_count is not null) as rows_with_fiber_provider_count,\n", " count(*) filter (where fcc_cable_provider_count is not null) as rows_with_cable_provider_count,\n", " count(*) filter (where fcc_fixed_wireless_provider_count is not null) as rows_with_fixed_wireless_provider_count,\n", " count(*) filter (where fcc_summary_json -> 'provider_summary' is not null) as rows_with_provider_summary_json\n", " from {CONNECTION_TABLE}\n", " ''',\n", " conn,\n", " )\n", "\n", " provider_agg = pd.read_sql(\n", " f'''\n", " select\n", " count(distinct provider_id) as provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Fiber') as fiber_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Cable') as cable_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Fixed Wireless') as fixed_wireless_provider_count,\n", " count(distinct provider_id) filter (where provider_class = 'Copper') as copper_provider_count\n", " from {PROVIDER_SUMMARY_TABLE}\n", " where as_of_date = %s\n", " ''',\n", " conn,\n", " params=(provider_as_of_date,),\n", " )\n", "\n", "print('Provider-summary QA:')\n", "display(provider_qa)\n", "print('Provider aggregate counts:')\n", "display(provider_agg)" ] }, { "cell_type": "markdown", "id": "27", "metadata": {}, "source": [ "## Phase 2B: Derive Scalar Broadband Columns from Summary JSON\n", "\n", "This step derives scalar speed columns from `fcc_summary_json` for easier SQL use.\n", "\n", "Notes:\n", "- `fcc_max_advertised_download_mbps` / `fcc_max_advertised_upload_mbps` are estimated from the highest speed tier with non-zero availability share.\n", "- Provider-count columns are populated from the separate provider-summary catalog, which is global catalog context rather than geography-specific broadband coverage." ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.14.5" } }, "nbformat": 4, "nbformat_minor": 5 }