From 6cb7c0334c090c539fdaac4de7b1b77fada1a791 Mon Sep 17 00:00:00 2001 From: dadams Date: Sat, 20 Jun 2026 20:59:19 -0700 Subject: [PATCH] Add ZCTA-level enrichment table build script Mirrors create_data_center_census_tract_table.py but at ZIP Code Tabulation Area geography (2020 boundary vintage, since ZCTAs are only redrawn each decennial census). Builds data_center_zcta_2024 (607 ZCTAs hosting >=1 facility, joined to ACS 2024 5-year demographics) and adds master_data_centers.zcta_geoid, parallel to the existing tract geoid column. Used to verify the income/education premium for DC host communities holds at ZIP-code resolution, not just census-tract resolution, for the dc-siting-politics paper. Co-Authored-By: Claude Sonnet 4.6 --- scripts/create_data_center_zcta_table.py | 591 +++++++++++++++++++++++ 1 file changed, 591 insertions(+) create mode 100644 scripts/create_data_center_zcta_table.py diff --git a/scripts/create_data_center_zcta_table.py b/scripts/create_data_center_zcta_table.py new file mode 100644 index 0000000..48d8145 --- /dev/null +++ b/scripts/create_data_center_zcta_table.py @@ -0,0 +1,591 @@ +#!/usr/bin/env python3 +"""Build a ZCTA-level enrichment table for data-center points, mirroring +create_data_center_census_tract_table.py but at ZIP Code Tabulation Area +geography. ZCTA cartographic boundaries are only redrawn each decennial +census, so the boundary vintage is 2020 (cb_2020_us_zcta520_500k) even +though the ACS demographic profile is the 2024 5-year release. +""" +import argparse +import csv +import json +import os +import subprocess +import urllib.parse +import urllib.request +from decimal import Decimal +from pathlib import Path + +import psycopg2 +from psycopg2.extras import execute_values + + +DB_NAME = "data_centers" +POINT_TABLE = "public.master_data_centers" +POINT_ID_COL = "master_id" +POINT_ZCTA_COL = "zcta_geoid" +BOUNDARY_STAGE_TABLE = "public._dc_zcta_boundaries_2020" +ACS_STAGE_TABLE = "public._dc_zcta_acs_2024" +FINAL_TABLE = "public.data_center_zcta_2024" + +ACS_YEAR = 2024 +ACS_SOURCE = "ACS 2024 5-year profile" +ZCTA_BOUNDARY_VINTAGE = "2020 ZCTA5 cartographic boundary (cb_2020_us_zcta520_500k)" +PROJECT_ROOT = Path(__file__).parent.parent +ZCTA_ZIP = PROJECT_ROOT / "data" / "cb_2020_us_zcta520_500k.zip" +ZCTA_ZIP_URL = ( + "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip" +) +ACS_AUDIT_CSV = PROJECT_ROOT / "data" / "zcta_acs_2024_national.csv" + +# Same ACS Data Profile variables as the tract table, so the two enrichment +# tables stay directly comparable. +ACS_VARIABLES = { + "DP05_0001E": "population", + "DP05_0018E": "median_age", + "DP02_0001E": "households", + "DP02_0016E": "avg_household_size", + "DP02_0067PE": "high_school_or_higher_pct", + "DP02_0068PE": "bachelor_or_higher_pct", + "DP02_0154PE": "broadband_subscription_pct", + "DP03_0001E": "population_16_over", + "DP03_0002E": "labor_force", + "DP03_0005E": "unemployed", + "DP03_0009PE": "unemployment_rate", + "DP03_0032E": "industry_total_workers", + "DP03_0033E": "industry_agriculture_mining_workers", + "DP03_0034E": "industry_construction_workers", + "DP03_0035E": "industry_manufacturing_workers", + "DP03_0036E": "industry_wholesale_trade_workers", + "DP03_0037E": "industry_retail_trade_workers", + "DP03_0038E": "industry_transportation_warehousing_utilities_workers", + "DP03_0039E": "industry_information_workers", + "DP03_0040E": "industry_finance_real_estate_workers", + "DP03_0041E": "industry_professional_management_admin_workers", + "DP03_0042E": "industry_education_health_social_workers", + "DP03_0043E": "industry_arts_entertainment_food_workers", + "DP03_0044E": "industry_other_services_workers", + "DP03_0045E": "industry_public_administration_workers", + "DP03_0062E": "median_household_income", + "DP03_0088E": "per_capita_income", + "DP03_0119PE": "family_poverty_rate", + "DP03_0128PE": "poverty_rate", + "DP05_0090E": "hispanic_latino_population", + "DP05_0090PE": "hispanic_latino_pct", + "DP05_0096E": "non_hispanic_white_population", + "DP05_0096PE": "non_hispanic_white_pct", + "DP05_0097E": "non_hispanic_black_population", + "DP05_0097PE": "non_hispanic_black_pct", + "DP05_0099E": "non_hispanic_asian_population", + "DP05_0099PE": "non_hispanic_asian_pct", +} + +COUNT_COLUMNS = { + "population", + "households", + "population_16_over", + "labor_force", + "unemployed", + "industry_total_workers", + "industry_agriculture_mining_workers", + "industry_construction_workers", + "industry_manufacturing_workers", + "industry_wholesale_trade_workers", + "industry_retail_trade_workers", + "industry_transportation_warehousing_utilities_workers", + "industry_information_workers", + "industry_finance_real_estate_workers", + "industry_professional_management_admin_workers", + "industry_education_health_social_workers", + "industry_arts_entertainment_food_workers", + "industry_other_services_workers", + "industry_public_administration_workers", + "median_household_income", + "per_capita_income", + "hispanic_latino_population", + "non_hispanic_white_population", + "non_hispanic_black_population", + "non_hispanic_asian_population", +} + +NUMERIC_COLUMNS = set(ACS_VARIABLES.values()) - COUNT_COLUMNS + +INDUSTRY_COLUMNS = { + "industry_agriculture_mining_workers": "Agriculture, forestry, fishing and hunting, and mining", + "industry_construction_workers": "Construction", + "industry_manufacturing_workers": "Manufacturing", + "industry_wholesale_trade_workers": "Wholesale trade", + "industry_retail_trade_workers": "Retail trade", + "industry_transportation_warehousing_utilities_workers": "Transportation and warehousing, and utilities", + "industry_information_workers": "Information", + "industry_finance_real_estate_workers": "Finance and insurance, and real estate and rental and leasing", + "industry_professional_management_admin_workers": "Professional, scientific, management, administrative, and waste management services", + "industry_education_health_social_workers": "Educational services, and health care and social assistance", + "industry_arts_entertainment_food_workers": "Arts, entertainment, recreation, accommodation, and food services", + "industry_other_services_workers": "Other services, except public administration", + "industry_public_administration_workers": "Public administration", +} + +SPECIAL_VALUES = { + "-666666666", + "-888888888", + "-999999999", + "-222222222", + "-333333333", + "-555555555", + "-666666666.0", + "-888888888.0", + "-999999999.0", + "-222222222.0", + "-333333333.0", + "-555555555.0", +} + + +def connect(): + return psycopg2.connect( + host=os.environ["PGWEB_HOST"], + port=os.environ["PGWEB_PORT"], + user=os.environ["PGWEB_USER"], + password=os.environ["PGWEB_PASSWORD"], + dbname=DB_NAME, + ) + + +def ensure_final_table_absent(conn): + with conn.cursor() as cur: + cur.execute("select to_regclass(%s)", (FINAL_TABLE,)) + if cur.fetchone()[0] is not None: + raise RuntimeError( + f"Target table {FINAL_TABLE} already exists; refusing to overwrite it." + ) + + +def drop_final_table_if_exists(conn): + with conn: + with conn.cursor() as cur: + cur.execute(f"drop table if exists {FINAL_TABLE}") + + +def download_zcta_boundaries(): + if ZCTA_ZIP.exists() and ZCTA_ZIP.stat().st_size > 30_000_000: + return + tmp_path = ZCTA_ZIP.with_suffix(".zip.part") + with urllib.request.urlopen(ZCTA_ZIP_URL, timeout=180) as response: + with tmp_path.open("wb") as out: + while True: + chunk = response.read(1024 * 1024) + if not chunk: + break + out.write(chunk) + tmp_path.rename(ZCTA_ZIP) + + +def import_zcta_boundaries(): + env = os.environ.copy() + env.update( + { + "PGHOST": os.environ["PGWEB_HOST"], + "PGPORT": os.environ["PGWEB_PORT"], + "PGUSER": os.environ["PGWEB_USER"], + "PGPASSWORD": os.environ["PGWEB_PASSWORD"], + "PGDATABASE": DB_NAME, + } + ) + cmd = [ + "ogr2ogr", + "-f", + "PostgreSQL", + "PG:dbname=data_centers", + f"/vsizip/{ZCTA_ZIP.resolve()}/cb_2020_us_zcta520_500k.shp", + "-nln", + BOUNDARY_STAGE_TABLE, + "-overwrite", + "-nlt", + "MULTIPOLYGON", + "-t_srs", + "EPSG:4326", + "-lco", + "GEOMETRY_NAME=geom", + "-lco", + "FID=gid", + ] + subprocess.run(cmd, check=True, env=env) + + +def fetch_acs_national(): + """ZCTAs are queried nationally in one call; they don't nest cleanly + inside a single state, so there's no per-state loop here.""" + variables = ["NAME", *ACS_VARIABLES.keys()] + params = { + "get": ",".join(variables), + "for": "zip code tabulation area:*", + } + api_key = os.environ.get("CENSUS_API_KEY") + if api_key: + params["key"] = api_key + url = ( + f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?" + + urllib.parse.urlencode(params) + ) + try: + with urllib.request.urlopen(url, timeout=180) as response: + body = response.read().decode("utf-8") + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + raise RuntimeError( + f"Census ACS request failed for ZCTA geography: HTTP {exc.code} — {body[:300]}" + ) from exc + try: + data = json.loads(body) + except json.JSONDecodeError as exc: + raise RuntimeError( + f"Census ACS returned non-JSON for ZCTA geography: {body[:300]}" + ) from exc + + header = data[0] + rows = [] + for values in data[1:]: + raw = dict(zip(header, values)) + row = { + "geoid": raw["zip code tabulation area"], + "acs_name": raw["NAME"], + } + for acs_var, column in ACS_VARIABLES.items(): + row[column] = clean_acs_value(raw.get(acs_var), column) + add_primary_industry(row) + rows.append(row) + return rows + + +def clean_acs_value(value, column): + if value in (None, "", "null") or value in SPECIAL_VALUES: + return None + if column in COUNT_COLUMNS: + return int(Decimal(value)) + if column in NUMERIC_COLUMNS: + return Decimal(value) + return value + + +def add_primary_industry(row): + industry_total = row.get("industry_total_workers") + best_column = None + best_value = None + for column in INDUSTRY_COLUMNS: + value = row.get(column) + if value is None: + continue + if best_value is None or value > best_value: + best_column = column + best_value = value + + row["primary_industry"] = INDUSTRY_COLUMNS.get(best_column) + row["primary_industry_workers"] = best_value + if industry_total and best_value is not None: + row["primary_industry_pct"] = Decimal(best_value * 100) / Decimal(industry_total) + else: + row["primary_industry_pct"] = None + + +def fetch_acs(): + rows = fetch_acs_national() + + fieldnames = [ + "geoid", + "acs_name", + *ACS_VARIABLES.values(), + "primary_industry", + "primary_industry_workers", + "primary_industry_pct", + ] + with ACS_AUDIT_CSV.open("w", newline="", encoding="utf-8") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + return rows, fieldnames + + +def load_acs_stage(conn, rows, fieldnames): + with conn: + with conn.cursor() as cur: + cur.execute(f"drop table if exists {ACS_STAGE_TABLE}") + cur.execute( + f""" + create table {ACS_STAGE_TABLE} ( + geoid text primary key, + acs_name text, + population integer, + median_age numeric, + households integer, + avg_household_size numeric, + high_school_or_higher_pct numeric, + bachelor_or_higher_pct numeric, + broadband_subscription_pct numeric, + population_16_over integer, + labor_force integer, + unemployed integer, + unemployment_rate numeric, + industry_total_workers integer, + industry_agriculture_mining_workers integer, + industry_construction_workers integer, + industry_manufacturing_workers integer, + industry_wholesale_trade_workers integer, + industry_retail_trade_workers integer, + industry_transportation_warehousing_utilities_workers integer, + industry_information_workers integer, + industry_finance_real_estate_workers integer, + industry_professional_management_admin_workers integer, + industry_education_health_social_workers integer, + industry_arts_entertainment_food_workers integer, + industry_other_services_workers integer, + industry_public_administration_workers integer, + median_household_income integer, + per_capita_income integer, + family_poverty_rate numeric, + poverty_rate numeric, + hispanic_latino_population integer, + hispanic_latino_pct numeric, + non_hispanic_white_population integer, + non_hispanic_white_pct numeric, + non_hispanic_black_population integer, + non_hispanic_black_pct numeric, + non_hispanic_asian_population integer, + non_hispanic_asian_pct numeric, + primary_industry text, + primary_industry_workers integer, + primary_industry_pct numeric + ) + """ + ) + values = [tuple(row.get(column) for column in fieldnames) for row in rows] + execute_values( + cur, + f"insert into {ACS_STAGE_TABLE} ({', '.join(fieldnames)}) values %s", + values, + page_size=1000, + ) + cur.execute(f"analyze {ACS_STAGE_TABLE}") + + +def create_final_table(conn): + with conn: + with conn.cursor() as cur: + cur.execute("drop index if exists _dc_zcta_boundaries_2020_geom_gix") + cur.execute( + f"create index _dc_zcta_boundaries_2020_geom_gix on {BOUNDARY_STAGE_TABLE} using gist (geom)" + ) + cur.execute(f"analyze {BOUNDARY_STAGE_TABLE}") + cur.execute( + f""" + create table {FINAL_TABLE} as + with dc_zctas as ( + select + t.zcta5ce20 as geoid, + count(*)::integer as data_center_count, + count(*) filter (where dc.source = 'curated')::integer + as curated_only_data_center_count, + count(*) filter (where dc.source = 'merged')::integer + as merged_data_center_count, + count(*) filter (where dc.source = 'osm')::integer + as osm_only_data_center_count, + array_agg(dc.{POINT_ID_COL} order by dc.{POINT_ID_COL}) as data_center_ids, + array_agg(distinct dc.operator) filter (where dc.operator is not null) + as operators + from {BOUNDARY_STAGE_TABLE} t + join {POINT_TABLE} dc + on t.geom && dc.geom + and ST_Covers(t.geom, dc.geom) + group by t.zcta5ce20 + ) + select + t.zcta5ce20 as geoid, + t.name20 as zcta_name, + t.aland20::bigint as land_area_sqm, + t.awater20::bigint as water_area_sqm, + {ACS_YEAR}::integer as acs_year, + '{ACS_SOURCE}'::text as acs_source, + '{ZCTA_BOUNDARY_VINTAGE}'::text as boundary_source, + a.acs_name, + d.data_center_count, + d.curated_only_data_center_count, + d.merged_data_center_count, + d.osm_only_data_center_count, + d.data_center_ids, + d.operators, + a.population, + a.median_age, + a.households, + a.avg_household_size, + a.high_school_or_higher_pct, + a.bachelor_or_higher_pct, + a.broadband_subscription_pct, + a.population_16_over, + a.labor_force, + a.unemployed, + a.unemployment_rate, + a.median_household_income, + a.per_capita_income, + a.family_poverty_rate, + a.poverty_rate, + a.hispanic_latino_population, + a.hispanic_latino_pct, + a.non_hispanic_white_population, + a.non_hispanic_white_pct, + a.non_hispanic_black_population, + a.non_hispanic_black_pct, + a.non_hispanic_asian_population, + a.non_hispanic_asian_pct, + a.industry_total_workers, + a.industry_agriculture_mining_workers, + a.industry_construction_workers, + a.industry_manufacturing_workers, + a.industry_wholesale_trade_workers, + a.industry_retail_trade_workers, + a.industry_transportation_warehousing_utilities_workers, + a.industry_information_workers, + a.industry_finance_real_estate_workers, + a.industry_professional_management_admin_workers, + a.industry_education_health_social_workers, + a.industry_arts_entertainment_food_workers, + a.industry_other_services_workers, + a.industry_public_administration_workers, + a.primary_industry, + a.primary_industry_workers, + a.primary_industry_pct, + t.geom::geometry(MultiPolygon, 4326) as geom + from {BOUNDARY_STAGE_TABLE} t + join dc_zctas d on d.geoid = t.zcta5ce20 + left join {ACS_STAGE_TABLE} a on a.geoid = t.zcta5ce20 + """ + ) + cur.execute(f"alter table {FINAL_TABLE} add primary key (geoid)") + cur.execute( + f"create index data_center_zcta_2024_geom_gix on {FINAL_TABLE} using gist (geom)" + ) + cur.execute( + f"create index data_center_zcta_2024_dc_count_idx on {FINAL_TABLE} (data_center_count desc)" + ) + cur.execute( + f""" + comment on table {FINAL_TABLE} is + 'ZIP Code Tabulation Areas (2020 boundary vintage) containing records from public.master_data_centers (curated + OSM merged), enriched with ACS 2024 5-year profile demographics and derived primary industry fields. Companion table to data_center_census_tracts_2024 at coarser ZCTA geography.' + """ + ) + cur.execute(f"analyze {FINAL_TABLE}") + + +def assign_point_zcta_geoids(conn): + with conn: + with conn.cursor() as cur: + cur.execute( + f"alter table {POINT_TABLE} add column if not exists {POINT_ZCTA_COL} text" + ) + cur.execute( + f""" + update {POINT_TABLE} dc + set {POINT_ZCTA_COL} = matched.geoid + from ( + select + dc_inner.{POINT_ID_COL} as point_id, + ( + select t.zcta5ce20 + from {BOUNDARY_STAGE_TABLE} t + where t.geom && dc_inner.geom + and st_covers(t.geom, dc_inner.geom) + order by t.zcta5ce20 + limit 1 + ) as geoid + from {POINT_TABLE} dc_inner + ) matched + where dc.{POINT_ID_COL} = matched.point_id + """ + ) + cur.execute( + f"create index if not exists master_data_centers_zcta_geoid_idx on {POINT_TABLE} ({POINT_ZCTA_COL})" + ) + cur.execute(f"analyze {POINT_TABLE}") + + +def validate(conn): + with conn.cursor() as cur: + cur.execute( + f""" + select + count(*)::integer as zcta_rows, + coalesce(sum(data_center_count), 0)::integer as assigned_data_centers, + count(*) filter (where geom is not null)::integer as geom_rows + from {FINAL_TABLE} + """ + ) + summary = cur.fetchone() + cur.execute(f"select count(*)::integer from {POINT_TABLE}") + total_points = cur.fetchone()[0] + cur.execute( + f""" + select count(*)::integer + from {POINT_TABLE} + where {POINT_ZCTA_COL} is null + """ + ) + unassigned_points = cur.fetchone()[0] + cur.execute( + f""" + select count(*)::integer + from {FINAL_TABLE} + where population is null + """ + ) + missing_acs = cur.fetchone()[0] + return summary, total_points, unassigned_points, missing_acs + + +def main(): + parser = argparse.ArgumentParser( + description="Build ZCTA-level enrichment table for data-center points." + ) + parser.add_argument( + "--replace-final", + action="store_true", + help="Drop and rebuild the final ZCTA table if it already exists.", + ) + args = parser.parse_args() + + conn = connect() + try: + if args.replace_final: + drop_final_table_if_exists(conn) + else: + ensure_final_table_absent(conn) + finally: + conn.close() + + download_zcta_boundaries() + import_zcta_boundaries() + acs_rows, acs_fieldnames = fetch_acs() + + conn = connect() + try: + if args.replace_final: + drop_final_table_if_exists(conn) + else: + ensure_final_table_absent(conn) + load_acs_stage(conn, acs_rows, acs_fieldnames) + create_final_table(conn) + assign_point_zcta_geoids(conn) + summary, total_points, unassigned_points, missing_acs = validate(conn) + finally: + conn.close() + + print(f"loaded {len(acs_rows)} ACS ZCTA rows into {ACS_STAGE_TABLE}") + print(f"created {FINAL_TABLE}") + print( + "zcta_rows={0} assigned_data_centers={1} geom_rows={2} source_points={3}".format( + summary[0], summary[1], summary[2], total_points + ) + ) + print(f"points_unassigned_to_zcta={unassigned_points}") + print(f"zctas_missing_acs_population={missing_acs}") + + +if __name__ == "__main__": + main()