#!/usr/bin/env python3 """Build a ZCTA-level enrichment table for data-center points, mirroring create_data_center_census_tract_table.py but at ZIP Code Tabulation Area geography. ZCTA cartographic boundaries are only redrawn each decennial census, so the boundary vintage is 2020 (cb_2020_us_zcta520_500k) even though the ACS demographic profile is the 2024 5-year release. """ import argparse import csv import json import os import subprocess import urllib.parse import urllib.request from decimal import Decimal from pathlib import Path import psycopg2 from psycopg2.extras import execute_values DB_NAME = "data_centers" POINT_TABLE = "public.master_data_centers" POINT_ID_COL = "master_id" POINT_ZCTA_COL = "zcta_geoid" BOUNDARY_STAGE_TABLE = "public._dc_zcta_boundaries_2020" ACS_STAGE_TABLE = "public._dc_zcta_acs_2024" FINAL_TABLE = "public.data_center_zcta_2024" ACS_YEAR = 2024 ACS_SOURCE = "ACS 2024 5-year profile" ZCTA_BOUNDARY_VINTAGE = "2020 ZCTA5 cartographic boundary (cb_2020_us_zcta520_500k)" PROJECT_ROOT = Path(__file__).parent.parent ZCTA_ZIP = PROJECT_ROOT / "data" / "cb_2020_us_zcta520_500k.zip" ZCTA_ZIP_URL = ( "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip" ) ACS_AUDIT_CSV = PROJECT_ROOT / "data" / "zcta_acs_2024_national.csv" # Same ACS Data Profile variables as the tract table, so the two enrichment # tables stay directly comparable. ACS_VARIABLES = { "DP05_0001E": "population", "DP05_0018E": "median_age", "DP02_0001E": "households", "DP02_0016E": "avg_household_size", "DP02_0067PE": "high_school_or_higher_pct", "DP02_0068PE": "bachelor_or_higher_pct", "DP02_0154PE": "broadband_subscription_pct", "DP03_0001E": "population_16_over", "DP03_0002E": "labor_force", "DP03_0005E": "unemployed", "DP03_0009PE": "unemployment_rate", "DP03_0032E": "industry_total_workers", "DP03_0033E": "industry_agriculture_mining_workers", "DP03_0034E": "industry_construction_workers", "DP03_0035E": "industry_manufacturing_workers", "DP03_0036E": "industry_wholesale_trade_workers", "DP03_0037E": "industry_retail_trade_workers", "DP03_0038E": "industry_transportation_warehousing_utilities_workers", "DP03_0039E": "industry_information_workers", "DP03_0040E": "industry_finance_real_estate_workers", "DP03_0041E": "industry_professional_management_admin_workers", "DP03_0042E": "industry_education_health_social_workers", "DP03_0043E": "industry_arts_entertainment_food_workers", "DP03_0044E": "industry_other_services_workers", "DP03_0045E": "industry_public_administration_workers", "DP03_0062E": "median_household_income", "DP03_0088E": "per_capita_income", "DP03_0119PE": "family_poverty_rate", "DP03_0128PE": "poverty_rate", "DP05_0090E": "hispanic_latino_population", "DP05_0090PE": "hispanic_latino_pct", "DP05_0096E": "non_hispanic_white_population", "DP05_0096PE": "non_hispanic_white_pct", "DP05_0097E": "non_hispanic_black_population", "DP05_0097PE": "non_hispanic_black_pct", "DP05_0099E": "non_hispanic_asian_population", "DP05_0099PE": "non_hispanic_asian_pct", } COUNT_COLUMNS = { "population", "households", "population_16_over", "labor_force", "unemployed", "industry_total_workers", "industry_agriculture_mining_workers", "industry_construction_workers", "industry_manufacturing_workers", "industry_wholesale_trade_workers", "industry_retail_trade_workers", "industry_transportation_warehousing_utilities_workers", "industry_information_workers", "industry_finance_real_estate_workers", "industry_professional_management_admin_workers", "industry_education_health_social_workers", "industry_arts_entertainment_food_workers", "industry_other_services_workers", "industry_public_administration_workers", "median_household_income", "per_capita_income", "hispanic_latino_population", "non_hispanic_white_population", "non_hispanic_black_population", "non_hispanic_asian_population", } NUMERIC_COLUMNS = set(ACS_VARIABLES.values()) - COUNT_COLUMNS INDUSTRY_COLUMNS = { "industry_agriculture_mining_workers": "Agriculture, forestry, fishing and hunting, and mining", "industry_construction_workers": "Construction", "industry_manufacturing_workers": "Manufacturing", "industry_wholesale_trade_workers": "Wholesale trade", "industry_retail_trade_workers": "Retail trade", "industry_transportation_warehousing_utilities_workers": "Transportation and warehousing, and utilities", "industry_information_workers": "Information", "industry_finance_real_estate_workers": "Finance and insurance, and real estate and rental and leasing", "industry_professional_management_admin_workers": "Professional, scientific, management, administrative, and waste management services", "industry_education_health_social_workers": "Educational services, and health care and social assistance", "industry_arts_entertainment_food_workers": "Arts, entertainment, recreation, accommodation, and food services", "industry_other_services_workers": "Other services, except public administration", "industry_public_administration_workers": "Public administration", } SPECIAL_VALUES = { "-666666666", "-888888888", "-999999999", "-222222222", "-333333333", "-555555555", "-666666666.0", "-888888888.0", "-999999999.0", "-222222222.0", "-333333333.0", "-555555555.0", } def connect(): return psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) def ensure_final_table_absent(conn): with conn.cursor() as cur: cur.execute("select to_regclass(%s)", (FINAL_TABLE,)) if cur.fetchone()[0] is not None: raise RuntimeError( f"Target table {FINAL_TABLE} already exists; refusing to overwrite it." ) def drop_final_table_if_exists(conn): with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {FINAL_TABLE}") def download_zcta_boundaries(): if ZCTA_ZIP.exists() and ZCTA_ZIP.stat().st_size > 30_000_000: return tmp_path = ZCTA_ZIP.with_suffix(".zip.part") with urllib.request.urlopen(ZCTA_ZIP_URL, timeout=180) as response: with tmp_path.open("wb") as out: while True: chunk = response.read(1024 * 1024) if not chunk: break out.write(chunk) tmp_path.rename(ZCTA_ZIP) def import_zcta_boundaries(): env = os.environ.copy() env.update( { "PGHOST": os.environ["PGWEB_HOST"], "PGPORT": os.environ["PGWEB_PORT"], "PGUSER": os.environ["PGWEB_USER"], "PGPASSWORD": os.environ["PGWEB_PASSWORD"], "PGDATABASE": DB_NAME, } ) cmd = [ "ogr2ogr", "-f", "PostgreSQL", "PG:dbname=data_centers", f"/vsizip/{ZCTA_ZIP.resolve()}/cb_2020_us_zcta520_500k.shp", "-nln", BOUNDARY_STAGE_TABLE, "-overwrite", "-nlt", "MULTIPOLYGON", "-t_srs", "EPSG:4326", "-lco", "GEOMETRY_NAME=geom", "-lco", "FID=gid", ] subprocess.run(cmd, check=True, env=env) def fetch_acs_national(): """ZCTAs are queried nationally in one call; they don't nest cleanly inside a single state, so there's no per-state loop here.""" variables = ["NAME", *ACS_VARIABLES.keys()] params = { "get": ",".join(variables), "for": "zip code tabulation area:*", } api_key = os.environ.get("CENSUS_API_KEY") if api_key: params["key"] = api_key url = ( f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?" + urllib.parse.urlencode(params) ) try: with urllib.request.urlopen(url, timeout=180) as response: body = response.read().decode("utf-8") except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") raise RuntimeError( f"Census ACS request failed for ZCTA geography: HTTP {exc.code} — {body[:300]}" ) from exc try: data = json.loads(body) except json.JSONDecodeError as exc: raise RuntimeError( f"Census ACS returned non-JSON for ZCTA geography: {body[:300]}" ) from exc header = data[0] rows = [] for values in data[1:]: raw = dict(zip(header, values)) row = { "geoid": raw["zip code tabulation area"], "acs_name": raw["NAME"], } for acs_var, column in ACS_VARIABLES.items(): row[column] = clean_acs_value(raw.get(acs_var), column) add_primary_industry(row) rows.append(row) return rows def clean_acs_value(value, column): if value in (None, "", "null") or value in SPECIAL_VALUES: return None if column in COUNT_COLUMNS: return int(Decimal(value)) if column in NUMERIC_COLUMNS: return Decimal(value) return value def add_primary_industry(row): industry_total = row.get("industry_total_workers") best_column = None best_value = None for column in INDUSTRY_COLUMNS: value = row.get(column) if value is None: continue if best_value is None or value > best_value: best_column = column best_value = value row["primary_industry"] = INDUSTRY_COLUMNS.get(best_column) row["primary_industry_workers"] = best_value if industry_total and best_value is not None: row["primary_industry_pct"] = Decimal(best_value * 100) / Decimal(industry_total) else: row["primary_industry_pct"] = None def fetch_acs(): rows = fetch_acs_national() fieldnames = [ "geoid", "acs_name", *ACS_VARIABLES.values(), "primary_industry", "primary_industry_workers", "primary_industry_pct", ] with ACS_AUDIT_CSV.open("w", newline="", encoding="utf-8") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return rows, fieldnames def load_acs_stage(conn, rows, fieldnames): with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {ACS_STAGE_TABLE}") cur.execute( f""" create table {ACS_STAGE_TABLE} ( geoid text primary key, acs_name text, population integer, median_age numeric, households integer, avg_household_size numeric, high_school_or_higher_pct numeric, bachelor_or_higher_pct numeric, broadband_subscription_pct numeric, population_16_over integer, labor_force integer, unemployed integer, unemployment_rate numeric, industry_total_workers integer, industry_agriculture_mining_workers integer, industry_construction_workers integer, industry_manufacturing_workers integer, industry_wholesale_trade_workers integer, industry_retail_trade_workers integer, industry_transportation_warehousing_utilities_workers integer, industry_information_workers integer, industry_finance_real_estate_workers integer, industry_professional_management_admin_workers integer, industry_education_health_social_workers integer, industry_arts_entertainment_food_workers integer, industry_other_services_workers integer, industry_public_administration_workers integer, median_household_income integer, per_capita_income integer, family_poverty_rate numeric, poverty_rate numeric, hispanic_latino_population integer, hispanic_latino_pct numeric, non_hispanic_white_population integer, non_hispanic_white_pct numeric, non_hispanic_black_population integer, non_hispanic_black_pct numeric, non_hispanic_asian_population integer, non_hispanic_asian_pct numeric, primary_industry text, primary_industry_workers integer, primary_industry_pct numeric ) """ ) values = [tuple(row.get(column) for column in fieldnames) for row in rows] execute_values( cur, f"insert into {ACS_STAGE_TABLE} ({', '.join(fieldnames)}) values %s", values, page_size=1000, ) cur.execute(f"analyze {ACS_STAGE_TABLE}") def create_final_table(conn): with conn: with conn.cursor() as cur: cur.execute("drop index if exists _dc_zcta_boundaries_2020_geom_gix") cur.execute( f"create index _dc_zcta_boundaries_2020_geom_gix on {BOUNDARY_STAGE_TABLE} using gist (geom)" ) cur.execute(f"analyze {BOUNDARY_STAGE_TABLE}") cur.execute( f""" create table {FINAL_TABLE} as with dc_zctas as ( select t.zcta5ce20 as geoid, count(*)::integer as data_center_count, count(*) filter (where dc.source = 'curated')::integer as curated_only_data_center_count, count(*) filter (where dc.source = 'merged')::integer as merged_data_center_count, count(*) filter (where dc.source = 'osm')::integer as osm_only_data_center_count, array_agg(dc.{POINT_ID_COL} order by dc.{POINT_ID_COL}) as data_center_ids, array_agg(distinct dc.operator) filter (where dc.operator is not null) as operators from {BOUNDARY_STAGE_TABLE} t join {POINT_TABLE} dc on t.geom && dc.geom and ST_Covers(t.geom, dc.geom) group by t.zcta5ce20 ) select t.zcta5ce20 as geoid, t.name20 as zcta_name, t.aland20::bigint as land_area_sqm, t.awater20::bigint as water_area_sqm, {ACS_YEAR}::integer as acs_year, '{ACS_SOURCE}'::text as acs_source, '{ZCTA_BOUNDARY_VINTAGE}'::text as boundary_source, a.acs_name, d.data_center_count, d.curated_only_data_center_count, d.merged_data_center_count, d.osm_only_data_center_count, d.data_center_ids, d.operators, a.population, a.median_age, a.households, a.avg_household_size, a.high_school_or_higher_pct, a.bachelor_or_higher_pct, a.broadband_subscription_pct, a.population_16_over, a.labor_force, a.unemployed, a.unemployment_rate, a.median_household_income, a.per_capita_income, a.family_poverty_rate, a.poverty_rate, a.hispanic_latino_population, a.hispanic_latino_pct, a.non_hispanic_white_population, a.non_hispanic_white_pct, a.non_hispanic_black_population, a.non_hispanic_black_pct, a.non_hispanic_asian_population, a.non_hispanic_asian_pct, a.industry_total_workers, a.industry_agriculture_mining_workers, a.industry_construction_workers, a.industry_manufacturing_workers, a.industry_wholesale_trade_workers, a.industry_retail_trade_workers, a.industry_transportation_warehousing_utilities_workers, a.industry_information_workers, a.industry_finance_real_estate_workers, a.industry_professional_management_admin_workers, a.industry_education_health_social_workers, a.industry_arts_entertainment_food_workers, a.industry_other_services_workers, a.industry_public_administration_workers, a.primary_industry, a.primary_industry_workers, a.primary_industry_pct, t.geom::geometry(MultiPolygon, 4326) as geom from {BOUNDARY_STAGE_TABLE} t join dc_zctas d on d.geoid = t.zcta5ce20 left join {ACS_STAGE_TABLE} a on a.geoid = t.zcta5ce20 """ ) cur.execute(f"alter table {FINAL_TABLE} add primary key (geoid)") cur.execute( f"create index data_center_zcta_2024_geom_gix on {FINAL_TABLE} using gist (geom)" ) cur.execute( f"create index data_center_zcta_2024_dc_count_idx on {FINAL_TABLE} (data_center_count desc)" ) cur.execute( f""" comment on table {FINAL_TABLE} is 'ZIP Code Tabulation Areas (2020 boundary vintage) containing records from public.master_data_centers (curated + OSM merged), enriched with ACS 2024 5-year profile demographics and derived primary industry fields. Companion table to data_center_census_tracts_2024 at coarser ZCTA geography.' """ ) cur.execute(f"analyze {FINAL_TABLE}") def assign_point_zcta_geoids(conn): with conn: with conn.cursor() as cur: cur.execute( f"alter table {POINT_TABLE} add column if not exists {POINT_ZCTA_COL} text" ) cur.execute( f""" update {POINT_TABLE} dc set {POINT_ZCTA_COL} = matched.geoid from ( select dc_inner.{POINT_ID_COL} as point_id, ( select t.zcta5ce20 from {BOUNDARY_STAGE_TABLE} t where t.geom && dc_inner.geom and st_covers(t.geom, dc_inner.geom) order by t.zcta5ce20 limit 1 ) as geoid from {POINT_TABLE} dc_inner ) matched where dc.{POINT_ID_COL} = matched.point_id """ ) cur.execute( f"create index if not exists master_data_centers_zcta_geoid_idx on {POINT_TABLE} ({POINT_ZCTA_COL})" ) cur.execute(f"analyze {POINT_TABLE}") def validate(conn): with conn.cursor() as cur: cur.execute( f""" select count(*)::integer as zcta_rows, coalesce(sum(data_center_count), 0)::integer as assigned_data_centers, count(*) filter (where geom is not null)::integer as geom_rows from {FINAL_TABLE} """ ) summary = cur.fetchone() cur.execute(f"select count(*)::integer from {POINT_TABLE}") total_points = cur.fetchone()[0] cur.execute( f""" select count(*)::integer from {POINT_TABLE} where {POINT_ZCTA_COL} is null """ ) unassigned_points = cur.fetchone()[0] cur.execute( f""" select count(*)::integer from {FINAL_TABLE} where population is null """ ) missing_acs = cur.fetchone()[0] return summary, total_points, unassigned_points, missing_acs def main(): parser = argparse.ArgumentParser( description="Build ZCTA-level enrichment table for data-center points." ) parser.add_argument( "--replace-final", action="store_true", help="Drop and rebuild the final ZCTA table if it already exists.", ) args = parser.parse_args() conn = connect() try: if args.replace_final: drop_final_table_if_exists(conn) else: ensure_final_table_absent(conn) finally: conn.close() download_zcta_boundaries() import_zcta_boundaries() acs_rows, acs_fieldnames = fetch_acs() conn = connect() try: if args.replace_final: drop_final_table_if_exists(conn) else: ensure_final_table_absent(conn) load_acs_stage(conn, acs_rows, acs_fieldnames) create_final_table(conn) assign_point_zcta_geoids(conn) summary, total_points, unassigned_points, missing_acs = validate(conn) finally: conn.close() print(f"loaded {len(acs_rows)} ACS ZCTA rows into {ACS_STAGE_TABLE}") print(f"created {FINAL_TABLE}") print( "zcta_rows={0} assigned_data_centers={1} geom_rows={2} source_points={3}".format( summary[0], summary[1], summary[2], total_points ) ) print(f"points_unassigned_to_zcta={unassigned_points}") print(f"zctas_missing_acs_population={missing_acs}") if __name__ == "__main__": main()