Files
data-centers/build_fcc_bdc_location_provider_aggregates.py

807 lines
34 KiB
Python

#!/usr/bin/env python3
"""Build FCC BDC provider aggregates for data-center counties and tracts.
This script uses FCC BDC State / Location Coverage files. Those files are
provider/location-level and include block GEOIDs, so they can be aggregated to
county and tract provider counts for only the geographies that contain data
centers.
"""
from __future__ import annotations
import argparse
import os
import tempfile
import zipfile
from collections.abc import Iterable
from datetime import date
from pathlib import Path
from typing import Any
import pandas as pd
import requests
from psycopg2.extras import execute_values
from build_fcc_bdc_broadband_connection_table import (
CONNECTION_TABLE,
FCC_BASE_URL,
FILES_TABLE,
USER_AGENT,
fcc_credentials,
get_conn,
load_zsh_secrets,
parse_date,
require_env,
)
DETAIL_TABLE = "public.fcc_bdc_location_provider_geography_provider"
AGG_TABLE = "public.fcc_bdc_location_provider_aggregate"
PROGRESS_TABLE = "public.fcc_bdc_location_provider_file_progress"
CROSSWALK_TABLE = "public.fcc_bdc_geoid_crosswalk"
TERRESTRIAL_TECHNOLOGY_CODES = ("10", "40", "50", "70", "71", "72")
FIXED_WIRELESS_CODES = {"70", "71", "72"}
CSV_USECOLS = [
"provider_id",
"block_geoid",
"technology",
"max_advertised_download_speed",
"max_advertised_upload_speed",
"business_residential_code",
]
CT_PLANNING_TO_LEGACY_TRACT_GEOIDS = {
"09110520302": "09003520302",
"09120090500": "09001090500",
"09170175800": "09009175800",
"09190020101": "09001020101",
"09190020900": "09001020900",
"09190044300": "09001044300",
}
def fcc_download_headers() -> dict[str, str]:
username, hash_value = fcc_credentials()
if not username or not hash_value:
raise RuntimeError(
"FCC BDC API requires FCC_USERNAME or FCC_BDC_USERNAME plus "
"FCC_API_KEY or FCC_HASH_VALUE."
)
return {
"username": username,
"hash_value": hash_value,
"user-agent": USER_AGENT,
"accept": "application/zip,*/*",
}
def normalize_codes(values: Iterable[str]) -> tuple[str, ...]:
return tuple(str(v).strip() for v in values if str(v).strip())
def create_tables(cur) -> None:
cur.execute(
f"""
create table if not exists {DETAIL_TABLE} (
as_of_date date not null,
file_id bigint not null,
geography_type text not null check (geography_type in ('County', 'Tract')),
geoid text not null,
provider_id bigint not null,
has_fiber boolean not null default false,
has_cable boolean not null default false,
has_fixed_wireless boolean not null default false,
has_copper boolean not null default false,
has_100_20 boolean not null default false,
has_business boolean not null default false,
has_business_fiber boolean not null default false,
has_business_100_20 boolean not null default false,
max_advertised_download_mbps numeric,
max_advertised_upload_mbps numeric,
matched_location_rows bigint not null default 0,
updated_at timestamptz not null default now(),
primary key (as_of_date, file_id, geography_type, geoid, provider_id)
)
"""
)
cur.execute(
f"create index if not exists fcc_bdc_location_provider_geo_idx "
f"on {DETAIL_TABLE} (as_of_date, geography_type, geoid)"
)
cur.execute(
f"""
create table if not exists {AGG_TABLE} (
as_of_date date not null,
geography_type text not null check (geography_type in ('County', 'Tract')),
geoid text not null,
provider_count integer not null,
fiber_provider_count integer not null,
cable_provider_count integer not null,
fixed_wireless_provider_count integer not null,
copper_provider_count integer not null,
provider_100_20_count integer not null,
business_provider_count integer not null,
business_fiber_provider_count integer not null,
business_100_20_provider_count integer not null,
max_advertised_download_mbps numeric,
max_advertised_upload_mbps numeric,
matched_location_rows bigint not null,
provider_file_rows bigint not null,
updated_at timestamptz not null default now(),
primary key (as_of_date, geography_type, geoid)
)
"""
)
cur.execute(
f"""
create table if not exists {PROGRESS_TABLE} (
as_of_date date not null,
file_id bigint not null,
state_fips text not null,
technology_code text,
technology_code_desc text,
record_count bigint,
matched_location_rows bigint not null,
provider_geo_rows bigint not null,
processed_at timestamptz not null default now(),
primary key (as_of_date, file_id)
)
"""
)
cur.execute(
f"""
create table if not exists {CROSSWALK_TABLE} (
source_geography_type text not null,
source_geoid text not null,
fcc_geography_type text not null,
fcc_geoid text not null,
method text not null,
notes text,
updated_at timestamptz not null default now(),
primary key (source_geography_type, source_geoid, fcc_geography_type)
)
"""
)
add_columns = [
"fcc_provider_geography_type text",
"fcc_provider_geoid text",
"fcc_county_provider_count integer",
"fcc_county_fiber_provider_count integer",
"fcc_county_cable_provider_count integer",
"fcc_county_fixed_wireless_provider_count integer",
"fcc_county_100_20_provider_count integer",
"fcc_county_business_provider_count integer",
"fcc_county_business_fiber_provider_count integer",
"fcc_county_business_100_20_provider_count integer",
"fcc_county_max_advertised_download_mbps numeric",
"fcc_county_max_advertised_upload_mbps numeric",
"fcc_tract_provider_count integer",
"fcc_tract_fiber_provider_count integer",
"fcc_tract_cable_provider_count integer",
"fcc_tract_fixed_wireless_provider_count integer",
"fcc_tract_100_20_provider_count integer",
"fcc_tract_business_provider_count integer",
"fcc_tract_business_fiber_provider_count integer",
"fcc_tract_business_100_20_provider_count integer",
"fcc_tract_max_advertised_download_mbps numeric",
"fcc_tract_max_advertised_upload_mbps numeric",
]
for definition in add_columns:
cur.execute(f"alter table {CONNECTION_TABLE} add column if not exists {definition}")
def seed_geoid_crosswalk(cur) -> None:
values = [
(
"Tract",
source_geoid,
"Tract",
fcc_geoid,
"ct_planning_region_to_legacy_county_same_tractce",
"Connecticut 2024 tract GEOIDs use planning-region county equivalents; FCC BDC block GEOIDs use legacy county codes.",
)
for source_geoid, fcc_geoid in CT_PLANNING_TO_LEGACY_TRACT_GEOIDS.items()
]
execute_values(
cur,
f"""
insert into {CROSSWALK_TABLE} (
source_geography_type, source_geoid, fcc_geography_type,
fcc_geoid, method, notes
)
values %s
on conflict (source_geography_type, source_geoid, fcc_geography_type)
do update set
fcc_geoid = excluded.fcc_geoid,
method = excluded.method,
notes = excluded.notes,
updated_at = now()
""",
values,
)
def latest_catalog_date(cur) -> date:
cur.execute(f"select max(as_of_date) from {FILES_TABLE}")
value = cur.fetchone()[0]
if value is None:
raise RuntimeError(f"No FCC catalog rows found in {FILES_TABLE}. Run the FCC catalog load first.")
return value
def target_geographies(cur, states: tuple[str, ...] | None = None) -> tuple[set[str], set[str], set[str]]:
state_filter = ""
params: list[Any] = []
if states:
state_filter = "where left(census_tract_geoid, 2) = any(%s)"
params.append(list(states))
cur.execute(
f"""
select distinct
left(census_tract_geoid, 2) as state_fips,
left(census_tract_geoid, 5) as county_geoid,
left(census_tract_geoid, 11) as tract_geoid
from {CONNECTION_TABLE}
{state_filter}
""",
params,
)
rows = cur.fetchall()
states_found = {r[0] for r in rows if r[0]}
counties = {r[1] for r in rows if r[1]}
tracts = {r[2] for r in rows if r[2]}
if tracts:
cur.execute(
f"""
select fcc_geoid
from {CROSSWALK_TABLE}
where source_geography_type = 'Tract'
and fcc_geography_type = 'Tract'
and source_geoid = any(%s)
""",
(list(tracts),),
)
fcc_tracts = {r[0] for r in cur.fetchall() if r[0]}
tracts.update(fcc_tracts)
counties.update({geoid[:5] for geoid in fcc_tracts})
return states_found, counties, tracts
def catalog_files(
cur,
as_of_date: date,
states: set[str],
technology_codes: tuple[str, ...],
limit: int | None,
) -> list[dict[str, Any]]:
cur.execute(
f"""
select file_id, state_fips, technology_code, technology_code_desc, file_name, record_count
from {FILES_TABLE}
where as_of_date = %s
and category = 'State'
and subcategory = 'Location Coverage'
and state_fips = any(%s)
and technology_code = any(%s)
order by state_fips, technology_code, file_id
""",
(as_of_date, list(states), list(technology_codes)),
)
rows = [
{
"file_id": int(file_id),
"state_fips": state_fips,
"technology_code": str(technology_code),
"technology_code_desc": technology_code_desc,
"file_name": file_name,
"record_count": record_count,
}
for file_id, state_fips, technology_code, technology_code_desc, file_name, record_count in cur.fetchall()
]
return rows[:limit] if limit is not None else rows
def progress_done(cur, as_of_date: date, file_id: int) -> bool:
cur.execute(
f"select 1 from {PROGRESS_TABLE} where as_of_date = %s and file_id = %s",
(as_of_date, file_id),
)
return cur.fetchone() is not None
def download_file(file_id: int, dest_dir: Path) -> Path:
url = f"{FCC_BASE_URL}/map/downloads/downloadFile/availability/{file_id}"
path = dest_dir / f"fcc_bdc_availability_{file_id}.zip"
with requests.get(url, headers=fcc_download_headers(), stream=True, timeout=(15, 300)) as response:
response.raise_for_status()
with path.open("wb") as fh:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
fh.write(chunk)
return path
def normalize_block_geoid(series: pd.Series) -> pd.Series:
return series.astype("string").str.replace(r"\.0$", "", regex=True).str.zfill(15)
def summarize_matches(
chunk: pd.DataFrame,
geography_type: str,
target_geoids: set[str],
) -> tuple[pd.DataFrame, int]:
geoid_len = 5 if geography_type == "County" else 11
geoid = chunk["block_geoid_norm"].str[:geoid_len]
matched = chunk[geoid.isin(target_geoids)].copy()
if matched.empty:
return pd.DataFrame(), 0
matched["geoid"] = geoid[matched.index]
matched["provider_id_num"] = pd.to_numeric(matched["provider_id"], errors="coerce")
matched = matched[matched["provider_id_num"].notna()].copy()
if matched.empty:
return pd.DataFrame(), 0
tech = matched["technology"].astype("string").str.replace(r"\.0$", "", regex=True)
down = pd.to_numeric(matched["max_advertised_download_speed"], errors="coerce")
upload = pd.to_numeric(matched["max_advertised_upload_speed"], errors="coerce")
business_code = matched["business_residential_code"].astype("string").str.upper().fillna("")
business = business_code.isin(["B", "X"])
matched["provider_id_num"] = matched["provider_id_num"].astype("int64")
matched["has_fiber"] = tech.eq("50")
matched["has_cable"] = tech.eq("40")
matched["has_fixed_wireless"] = tech.isin(FIXED_WIRELESS_CODES)
matched["has_copper"] = tech.eq("10")
matched["has_100_20"] = down.ge(100) & upload.ge(20)
matched["has_business"] = business
matched["has_business_fiber"] = business & matched["has_fiber"]
matched["has_business_100_20"] = business & matched["has_100_20"]
matched["max_down"] = down
matched["max_up"] = upload
matched["matched_location_rows"] = 1
grouped = (
matched.groupby(["geoid", "provider_id_num"], as_index=False)
.agg(
has_fiber=("has_fiber", "max"),
has_cable=("has_cable", "max"),
has_fixed_wireless=("has_fixed_wireless", "max"),
has_copper=("has_copper", "max"),
has_100_20=("has_100_20", "max"),
has_business=("has_business", "max"),
has_business_fiber=("has_business_fiber", "max"),
has_business_100_20=("has_business_100_20", "max"),
max_down=("max_down", "max"),
max_up=("max_up", "max"),
matched_location_rows=("matched_location_rows", "sum"),
)
)
return grouped, len(matched)
def upsert_detail(
cur,
as_of_date: date,
file_id: int,
geography_type: str,
grouped: pd.DataFrame,
) -> int:
if grouped.empty:
return 0
values = [
(
as_of_date,
file_id,
geography_type,
row.geoid,
int(row.provider_id_num),
bool(row.has_fiber),
bool(row.has_cable),
bool(row.has_fixed_wireless),
bool(row.has_copper),
bool(row.has_100_20),
bool(row.has_business),
bool(row.has_business_fiber),
bool(row.has_business_100_20),
None if pd.isna(row.max_down) else float(row.max_down),
None if pd.isna(row.max_up) else float(row.max_up),
int(row.matched_location_rows),
)
for row in grouped.itertuples(index=False)
]
execute_values(
cur,
f"""
insert into {DETAIL_TABLE} (
as_of_date, file_id, geography_type, geoid, provider_id,
has_fiber, has_cable, has_fixed_wireless, has_copper,
has_100_20, has_business, has_business_fiber, has_business_100_20,
max_advertised_download_mbps, max_advertised_upload_mbps,
matched_location_rows
)
values %s
on conflict (as_of_date, file_id, geography_type, geoid, provider_id)
do update set
has_fiber = {DETAIL_TABLE}.has_fiber or excluded.has_fiber,
has_cable = {DETAIL_TABLE}.has_cable or excluded.has_cable,
has_fixed_wireless = {DETAIL_TABLE}.has_fixed_wireless or excluded.has_fixed_wireless,
has_copper = {DETAIL_TABLE}.has_copper or excluded.has_copper,
has_100_20 = {DETAIL_TABLE}.has_100_20 or excluded.has_100_20,
has_business = {DETAIL_TABLE}.has_business or excluded.has_business,
has_business_fiber = {DETAIL_TABLE}.has_business_fiber or excluded.has_business_fiber,
has_business_100_20 = {DETAIL_TABLE}.has_business_100_20 or excluded.has_business_100_20,
max_advertised_download_mbps = greatest(
{DETAIL_TABLE}.max_advertised_download_mbps,
excluded.max_advertised_download_mbps
),
max_advertised_upload_mbps = greatest(
{DETAIL_TABLE}.max_advertised_upload_mbps,
excluded.max_advertised_upload_mbps
),
matched_location_rows = {DETAIL_TABLE}.matched_location_rows + excluded.matched_location_rows,
updated_at = now()
""",
values,
page_size=1000,
)
return len(values)
def process_file(
conn,
file_row: dict[str, Any],
as_of_date: date,
county_geoids: set[str],
tract_geoids: set[str],
chunksize: int,
temp_dir: Path,
) -> tuple[int, int]:
file_id = file_row["file_id"]
zip_path = download_file(file_id, temp_dir)
matched_rows = 0
provider_geo_rows = 0
try:
with zipfile.ZipFile(zip_path) as archive:
csv_names = [name for name in archive.namelist() if name.lower().endswith(".csv")]
if not csv_names:
raise RuntimeError(f"FCC file_id={file_id} did not contain a CSV: {archive.namelist()}")
with archive.open(csv_names[0]) as csv_file:
reader = pd.read_csv(
csv_file,
usecols=CSV_USECOLS,
dtype="string",
chunksize=chunksize,
low_memory=False,
)
with conn.cursor() as cur:
cur.execute(
f"delete from {DETAIL_TABLE} where as_of_date = %s and file_id = %s",
(as_of_date, file_id),
)
cur.execute(
f"delete from {PROGRESS_TABLE} where as_of_date = %s and file_id = %s",
(as_of_date, file_id),
)
conn.commit()
for chunk_number, chunk in enumerate(reader, start=1):
chunk["block_geoid_norm"] = normalize_block_geoid(chunk["block_geoid"])
county_grouped, county_matches = summarize_matches(chunk, "County", county_geoids)
tract_grouped, tract_matches = summarize_matches(chunk, "Tract", tract_geoids)
with conn.cursor() as cur:
provider_geo_rows += upsert_detail(cur, as_of_date, file_id, "County", county_grouped)
provider_geo_rows += upsert_detail(cur, as_of_date, file_id, "Tract", tract_grouped)
conn.commit()
matched_rows += county_matches + tract_matches
if matched_rows and chunk_number % 10 == 0:
print(f" file_id={file_id}: chunk {chunk_number:,}, matched row-events={matched_rows:,}")
with conn.cursor() as cur:
cur.execute(
f"""
insert into {PROGRESS_TABLE} (
as_of_date, file_id, state_fips, technology_code,
technology_code_desc, record_count, matched_location_rows, provider_geo_rows
)
values (%s, %s, %s, %s, %s, %s, %s, %s)
on conflict (as_of_date, file_id) do update set
state_fips = excluded.state_fips,
technology_code = excluded.technology_code,
technology_code_desc = excluded.technology_code_desc,
record_count = excluded.record_count,
matched_location_rows = excluded.matched_location_rows,
provider_geo_rows = excluded.provider_geo_rows,
processed_at = now()
""",
(
as_of_date,
file_id,
file_row["state_fips"],
file_row["technology_code"],
file_row["technology_code_desc"],
file_row["record_count"],
matched_rows,
provider_geo_rows,
),
)
conn.commit()
return matched_rows, provider_geo_rows
finally:
zip_path.unlink(missing_ok=True)
def rebuild_aggregate(cur, as_of_date: date) -> int:
cur.execute(f"delete from {AGG_TABLE} where as_of_date = %s", (as_of_date,))
cur.execute(
f"""
insert into {AGG_TABLE} (
as_of_date, geography_type, geoid,
provider_count, fiber_provider_count, cable_provider_count,
fixed_wireless_provider_count, copper_provider_count,
provider_100_20_count, business_provider_count,
business_fiber_provider_count, business_100_20_provider_count,
max_advertised_download_mbps, max_advertised_upload_mbps,
matched_location_rows, provider_file_rows
)
with per_provider as (
select
as_of_date,
geography_type,
geoid,
provider_id,
bool_or(has_fiber) as has_fiber,
bool_or(has_cable) as has_cable,
bool_or(has_fixed_wireless) as has_fixed_wireless,
bool_or(has_copper) as has_copper,
bool_or(has_100_20) as has_100_20,
bool_or(has_business) as has_business,
bool_or(has_business_fiber) as has_business_fiber,
bool_or(has_business_100_20) as has_business_100_20,
max(max_advertised_download_mbps) as max_advertised_download_mbps,
max(max_advertised_upload_mbps) as max_advertised_upload_mbps,
sum(matched_location_rows) as matched_location_rows,
count(*) as provider_file_rows
from {DETAIL_TABLE}
where as_of_date = %s
group by 1, 2, 3, 4
)
select
as_of_date,
geography_type,
geoid,
count(*)::integer as provider_count,
count(*) filter (where has_fiber)::integer as fiber_provider_count,
count(*) filter (where has_cable)::integer as cable_provider_count,
count(*) filter (where has_fixed_wireless)::integer as fixed_wireless_provider_count,
count(*) filter (where has_copper)::integer as copper_provider_count,
count(*) filter (where has_100_20)::integer as provider_100_20_count,
count(*) filter (where has_business)::integer as business_provider_count,
count(*) filter (where has_business_fiber)::integer as business_fiber_provider_count,
count(*) filter (where has_business_100_20)::integer as business_100_20_provider_count,
max(max_advertised_download_mbps) as max_advertised_download_mbps,
max(max_advertised_upload_mbps) as max_advertised_upload_mbps,
sum(matched_location_rows)::bigint as matched_location_rows,
sum(provider_file_rows)::bigint as provider_file_rows
from per_provider
group by 1, 2, 3
""",
(as_of_date,),
)
return cur.rowcount
def update_connection_table(cur, as_of_date: date) -> int:
cur.execute(
f"""
with joined as (
select
c.master_id,
coalesce(x.fcc_geoid, left(c.census_tract_geoid, 11)) as provider_tract_geoid,
coalesce(left(x.fcc_geoid, 5), left(c.census_tract_geoid, 5)) as provider_county_geoid,
county.geoid as county_geoid,
tract.geoid as tract_geoid,
county.provider_count as county_provider_count,
county.fiber_provider_count as county_fiber_provider_count,
county.cable_provider_count as county_cable_provider_count,
county.fixed_wireless_provider_count as county_fixed_wireless_provider_count,
county.provider_100_20_count as county_100_20_provider_count,
county.business_provider_count as county_business_provider_count,
county.business_fiber_provider_count as county_business_fiber_provider_count,
county.business_100_20_provider_count as county_business_100_20_provider_count,
county.max_advertised_download_mbps as county_max_down,
county.max_advertised_upload_mbps as county_max_up,
tract.provider_count as tract_provider_count,
tract.fiber_provider_count as tract_fiber_provider_count,
tract.cable_provider_count as tract_cable_provider_count,
tract.fixed_wireless_provider_count as tract_fixed_wireless_provider_count,
tract.provider_100_20_count as tract_100_20_provider_count,
tract.business_provider_count as tract_business_provider_count,
tract.business_fiber_provider_count as tract_business_fiber_provider_count,
tract.business_100_20_provider_count as tract_business_100_20_provider_count,
tract.max_advertised_download_mbps as tract_max_down,
tract.max_advertised_upload_mbps as tract_max_up
from {CONNECTION_TABLE} c
left join {CROSSWALK_TABLE} x
on x.source_geography_type = 'Tract'
and x.fcc_geography_type = 'Tract'
and x.source_geoid = c.census_tract_geoid
left join {AGG_TABLE} county
on county.as_of_date = %s
and county.geography_type = 'County'
and county.geoid = coalesce(left(x.fcc_geoid, 5), left(c.census_tract_geoid, 5))
left join {AGG_TABLE} tract
on tract.as_of_date = %s
and tract.geography_type = 'Tract'
and tract.geoid = coalesce(x.fcc_geoid, left(c.census_tract_geoid, 11))
)
update {CONNECTION_TABLE} c
set
fcc_provider_geography_type = case
when j.tract_geoid is not null then 'Tract'
when j.county_geoid is not null then 'County'
else c.fcc_provider_geography_type
end,
fcc_provider_geoid = coalesce(j.tract_geoid, j.county_geoid, c.fcc_provider_geoid),
fcc_provider_count = coalesce(j.tract_provider_count, j.county_provider_count),
fcc_fiber_provider_count = coalesce(j.tract_fiber_provider_count, j.county_fiber_provider_count),
fcc_cable_provider_count = coalesce(j.tract_cable_provider_count, j.county_cable_provider_count),
fcc_fixed_wireless_provider_count = coalesce(j.tract_fixed_wireless_provider_count, j.county_fixed_wireless_provider_count),
fcc_100_20_provider_count = coalesce(j.tract_100_20_provider_count, j.county_100_20_provider_count),
fcc_max_advertised_download_mbps = coalesce(j.tract_max_down, j.county_max_down, c.fcc_max_advertised_download_mbps),
fcc_max_advertised_upload_mbps = coalesce(j.tract_max_up, j.county_max_up, c.fcc_max_advertised_upload_mbps),
fcc_county_provider_count = j.county_provider_count,
fcc_county_fiber_provider_count = j.county_fiber_provider_count,
fcc_county_cable_provider_count = j.county_cable_provider_count,
fcc_county_fixed_wireless_provider_count = j.county_fixed_wireless_provider_count,
fcc_county_100_20_provider_count = j.county_100_20_provider_count,
fcc_county_business_provider_count = j.county_business_provider_count,
fcc_county_business_fiber_provider_count = j.county_business_fiber_provider_count,
fcc_county_business_100_20_provider_count = j.county_business_100_20_provider_count,
fcc_county_max_advertised_download_mbps = j.county_max_down,
fcc_county_max_advertised_upload_mbps = j.county_max_up,
fcc_tract_provider_count = j.tract_provider_count,
fcc_tract_fiber_provider_count = j.tract_fiber_provider_count,
fcc_tract_cable_provider_count = j.tract_cable_provider_count,
fcc_tract_fixed_wireless_provider_count = j.tract_fixed_wireless_provider_count,
fcc_tract_100_20_provider_count = j.tract_100_20_provider_count,
fcc_tract_business_provider_count = j.tract_business_provider_count,
fcc_tract_business_fiber_provider_count = j.tract_business_fiber_provider_count,
fcc_tract_business_100_20_provider_count = j.tract_business_100_20_provider_count,
fcc_tract_max_advertised_download_mbps = j.tract_max_down,
fcc_tract_max_advertised_upload_mbps = j.tract_max_up,
fcc_summary_json = jsonb_set(
coalesce(c.fcc_summary_json, '{{}}'::jsonb),
'{{location_provider_aggregate}}',
jsonb_build_object(
'source', 'fcc_state_location_coverage',
'as_of_date', %s::text,
'preferred_geography_type', case
when j.tract_geoid is not null then 'Tract'
when j.county_geoid is not null then 'County'
else null
end,
'preferred_geoid', coalesce(j.tract_geoid, j.county_geoid),
'county_geoid', j.county_geoid,
'tract_geoid', j.tract_geoid
),
true
),
fcc_bdc_status = case
when coalesce(j.tract_geoid, j.county_geoid) is not null then 'fcc_location_provider_joined'
else c.fcc_bdc_status
end,
updated_at = now()
from joined j
where c.master_id = j.master_id
and coalesce(j.tract_geoid, j.county_geoid) is not null
""",
(as_of_date, as_of_date, as_of_date),
)
return cur.rowcount
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--as-of-date", help="FCC availability as-of date; defaults to latest catalog date.")
parser.add_argument("--states", nargs="*", help="Optional state FIPS list, e.g. 11 34 51.")
parser.add_argument("--technology-codes", nargs="*", default=list(TERRESTRIAL_TECHNOLOGY_CODES))
parser.add_argument("--limit-files", type=int, help="Process only the first N matching files.")
parser.add_argument("--chunksize", type=int, default=500_000)
parser.add_argument("--refresh", action="store_true", help="Delete existing location-provider rows for this as-of date first.")
parser.add_argument("--no-resume", action="store_true", help="Reprocess files even if marked complete.")
parser.add_argument("--no-update-connection", action="store_true", help="Build aggregate tables but do not update data_center_broadband_connection.")
args = parser.parse_args()
load_zsh_secrets()
require_env(["PGWEB_HOST", "PGWEB_PORT", "PGWEB_USER", "PGWEB_PASSWORD"])
as_of_date = parse_date(args.as_of_date) if args.as_of_date else None
if as_of_date is None and args.as_of_date:
raise RuntimeError(f"Invalid --as-of-date: {args.as_of_date}")
technology_codes = normalize_codes(args.technology_codes)
requested_states = tuple(s.zfill(2) for s in args.states) if args.states else None
with get_conn() as conn:
with conn.cursor() as cur:
create_tables(cur)
seed_geoid_crosswalk(cur)
as_of_date = as_of_date or latest_catalog_date(cur)
states, counties, tracts = target_geographies(cur, requested_states)
if not states:
raise RuntimeError("No target data-center states found.")
if args.refresh:
cur.execute(f"delete from {DETAIL_TABLE} where as_of_date = %s", (as_of_date,))
cur.execute(f"delete from {AGG_TABLE} where as_of_date = %s", (as_of_date,))
cur.execute(f"delete from {PROGRESS_TABLE} where as_of_date = %s", (as_of_date,))
files = catalog_files(cur, as_of_date, states, technology_codes, args.limit_files)
conn.commit()
print(f"FCC as_of_date: {as_of_date}")
print(f"Target states: {len(states):,} | counties: {len(counties):,} | tracts: {len(tracts):,}")
print(f"Location coverage files selected: {len(files):,}")
total_matched_rows = 0
total_provider_geo_rows = 0
with tempfile.TemporaryDirectory(prefix="fcc_bdc_location_") as temp:
temp_dir = Path(temp)
for idx, file_row in enumerate(files, start=1):
file_id = file_row["file_id"]
with conn.cursor() as cur:
skip = (not args.no_resume) and progress_done(cur, as_of_date, file_id)
if skip:
print(f"[{idx:,}/{len(files):,}] skip file_id={file_id} already processed")
continue
print(
f"[{idx:,}/{len(files):,}] file_id={file_id} state={file_row['state_fips']} "
f"tech={file_row['technology_code']} records={file_row['record_count']:,}"
)
matched_rows, provider_geo_rows = process_file(
conn,
file_row,
as_of_date,
counties,
tracts,
args.chunksize,
temp_dir,
)
total_matched_rows += matched_rows
total_provider_geo_rows += provider_geo_rows
print(
f" complete file_id={file_id}: matched row-events={matched_rows:,}, "
f"provider-geography rows={provider_geo_rows:,}"
)
with conn.cursor() as cur:
agg_rows = rebuild_aggregate(cur, as_of_date)
updated_rows = 0
if not args.no_update_connection:
updated_rows = update_connection_table(cur, as_of_date)
conn.commit()
print(f"New matched row-events this run: {total_matched_rows:,}")
print(f"New provider-geography detail rows this run: {total_provider_geo_rows:,}")
print(f"{AGG_TABLE}: rebuilt {agg_rows:,} geography rows")
if not args.no_update_connection:
print(f"{CONNECTION_TABLE}: updated {updated_rows:,} rows with location-provider aggregates")
return 0
if __name__ == "__main__":
raise SystemExit(main())