Files
data-centers/scripts/create_data_center_zcta_table.py
dadams 6cb7c0334c Add ZCTA-level enrichment table build script
Mirrors create_data_center_census_tract_table.py but at ZIP Code
Tabulation Area geography (2020 boundary vintage, since ZCTAs are only
redrawn each decennial census). Builds data_center_zcta_2024 (607 ZCTAs
hosting >=1 facility, joined to ACS 2024 5-year demographics) and adds
master_data_centers.zcta_geoid, parallel to the existing tract geoid
column. Used to verify the income/education premium for DC host
communities holds at ZIP-code resolution, not just census-tract
resolution, for the dc-siting-politics paper.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-20 20:59:19 -07:00

592 lines
22 KiB
Python

#!/usr/bin/env python3
"""Build a ZCTA-level enrichment table for data-center points, mirroring
create_data_center_census_tract_table.py but at ZIP Code Tabulation Area
geography. ZCTA cartographic boundaries are only redrawn each decennial
census, so the boundary vintage is 2020 (cb_2020_us_zcta520_500k) even
though the ACS demographic profile is the 2024 5-year release.
"""
import argparse
import csv
import json
import os
import subprocess
import urllib.parse
import urllib.request
from decimal import Decimal
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_values
DB_NAME = "data_centers"
POINT_TABLE = "public.master_data_centers"
POINT_ID_COL = "master_id"
POINT_ZCTA_COL = "zcta_geoid"
BOUNDARY_STAGE_TABLE = "public._dc_zcta_boundaries_2020"
ACS_STAGE_TABLE = "public._dc_zcta_acs_2024"
FINAL_TABLE = "public.data_center_zcta_2024"
ACS_YEAR = 2024
ACS_SOURCE = "ACS 2024 5-year profile"
ZCTA_BOUNDARY_VINTAGE = "2020 ZCTA5 cartographic boundary (cb_2020_us_zcta520_500k)"
PROJECT_ROOT = Path(__file__).parent.parent
ZCTA_ZIP = PROJECT_ROOT / "data" / "cb_2020_us_zcta520_500k.zip"
ZCTA_ZIP_URL = (
"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip"
)
ACS_AUDIT_CSV = PROJECT_ROOT / "data" / "zcta_acs_2024_national.csv"
# Same ACS Data Profile variables as the tract table, so the two enrichment
# tables stay directly comparable.
ACS_VARIABLES = {
"DP05_0001E": "population",
"DP05_0018E": "median_age",
"DP02_0001E": "households",
"DP02_0016E": "avg_household_size",
"DP02_0067PE": "high_school_or_higher_pct",
"DP02_0068PE": "bachelor_or_higher_pct",
"DP02_0154PE": "broadband_subscription_pct",
"DP03_0001E": "population_16_over",
"DP03_0002E": "labor_force",
"DP03_0005E": "unemployed",
"DP03_0009PE": "unemployment_rate",
"DP03_0032E": "industry_total_workers",
"DP03_0033E": "industry_agriculture_mining_workers",
"DP03_0034E": "industry_construction_workers",
"DP03_0035E": "industry_manufacturing_workers",
"DP03_0036E": "industry_wholesale_trade_workers",
"DP03_0037E": "industry_retail_trade_workers",
"DP03_0038E": "industry_transportation_warehousing_utilities_workers",
"DP03_0039E": "industry_information_workers",
"DP03_0040E": "industry_finance_real_estate_workers",
"DP03_0041E": "industry_professional_management_admin_workers",
"DP03_0042E": "industry_education_health_social_workers",
"DP03_0043E": "industry_arts_entertainment_food_workers",
"DP03_0044E": "industry_other_services_workers",
"DP03_0045E": "industry_public_administration_workers",
"DP03_0062E": "median_household_income",
"DP03_0088E": "per_capita_income",
"DP03_0119PE": "family_poverty_rate",
"DP03_0128PE": "poverty_rate",
"DP05_0090E": "hispanic_latino_population",
"DP05_0090PE": "hispanic_latino_pct",
"DP05_0096E": "non_hispanic_white_population",
"DP05_0096PE": "non_hispanic_white_pct",
"DP05_0097E": "non_hispanic_black_population",
"DP05_0097PE": "non_hispanic_black_pct",
"DP05_0099E": "non_hispanic_asian_population",
"DP05_0099PE": "non_hispanic_asian_pct",
}
COUNT_COLUMNS = {
"population",
"households",
"population_16_over",
"labor_force",
"unemployed",
"industry_total_workers",
"industry_agriculture_mining_workers",
"industry_construction_workers",
"industry_manufacturing_workers",
"industry_wholesale_trade_workers",
"industry_retail_trade_workers",
"industry_transportation_warehousing_utilities_workers",
"industry_information_workers",
"industry_finance_real_estate_workers",
"industry_professional_management_admin_workers",
"industry_education_health_social_workers",
"industry_arts_entertainment_food_workers",
"industry_other_services_workers",
"industry_public_administration_workers",
"median_household_income",
"per_capita_income",
"hispanic_latino_population",
"non_hispanic_white_population",
"non_hispanic_black_population",
"non_hispanic_asian_population",
}
NUMERIC_COLUMNS = set(ACS_VARIABLES.values()) - COUNT_COLUMNS
INDUSTRY_COLUMNS = {
"industry_agriculture_mining_workers": "Agriculture, forestry, fishing and hunting, and mining",
"industry_construction_workers": "Construction",
"industry_manufacturing_workers": "Manufacturing",
"industry_wholesale_trade_workers": "Wholesale trade",
"industry_retail_trade_workers": "Retail trade",
"industry_transportation_warehousing_utilities_workers": "Transportation and warehousing, and utilities",
"industry_information_workers": "Information",
"industry_finance_real_estate_workers": "Finance and insurance, and real estate and rental and leasing",
"industry_professional_management_admin_workers": "Professional, scientific, management, administrative, and waste management services",
"industry_education_health_social_workers": "Educational services, and health care and social assistance",
"industry_arts_entertainment_food_workers": "Arts, entertainment, recreation, accommodation, and food services",
"industry_other_services_workers": "Other services, except public administration",
"industry_public_administration_workers": "Public administration",
}
SPECIAL_VALUES = {
"-666666666",
"-888888888",
"-999999999",
"-222222222",
"-333333333",
"-555555555",
"-666666666.0",
"-888888888.0",
"-999999999.0",
"-222222222.0",
"-333333333.0",
"-555555555.0",
}
def connect():
return psycopg2.connect(
host=os.environ["PGWEB_HOST"],
port=os.environ["PGWEB_PORT"],
user=os.environ["PGWEB_USER"],
password=os.environ["PGWEB_PASSWORD"],
dbname=DB_NAME,
)
def ensure_final_table_absent(conn):
with conn.cursor() as cur:
cur.execute("select to_regclass(%s)", (FINAL_TABLE,))
if cur.fetchone()[0] is not None:
raise RuntimeError(
f"Target table {FINAL_TABLE} already exists; refusing to overwrite it."
)
def drop_final_table_if_exists(conn):
with conn:
with conn.cursor() as cur:
cur.execute(f"drop table if exists {FINAL_TABLE}")
def download_zcta_boundaries():
if ZCTA_ZIP.exists() and ZCTA_ZIP.stat().st_size > 30_000_000:
return
tmp_path = ZCTA_ZIP.with_suffix(".zip.part")
with urllib.request.urlopen(ZCTA_ZIP_URL, timeout=180) as response:
with tmp_path.open("wb") as out:
while True:
chunk = response.read(1024 * 1024)
if not chunk:
break
out.write(chunk)
tmp_path.rename(ZCTA_ZIP)
def import_zcta_boundaries():
env = os.environ.copy()
env.update(
{
"PGHOST": os.environ["PGWEB_HOST"],
"PGPORT": os.environ["PGWEB_PORT"],
"PGUSER": os.environ["PGWEB_USER"],
"PGPASSWORD": os.environ["PGWEB_PASSWORD"],
"PGDATABASE": DB_NAME,
}
)
cmd = [
"ogr2ogr",
"-f",
"PostgreSQL",
"PG:dbname=data_centers",
f"/vsizip/{ZCTA_ZIP.resolve()}/cb_2020_us_zcta520_500k.shp",
"-nln",
BOUNDARY_STAGE_TABLE,
"-overwrite",
"-nlt",
"MULTIPOLYGON",
"-t_srs",
"EPSG:4326",
"-lco",
"GEOMETRY_NAME=geom",
"-lco",
"FID=gid",
]
subprocess.run(cmd, check=True, env=env)
def fetch_acs_national():
"""ZCTAs are queried nationally in one call; they don't nest cleanly
inside a single state, so there's no per-state loop here."""
variables = ["NAME", *ACS_VARIABLES.keys()]
params = {
"get": ",".join(variables),
"for": "zip code tabulation area:*",
}
api_key = os.environ.get("CENSUS_API_KEY")
if api_key:
params["key"] = api_key
url = (
f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?"
+ urllib.parse.urlencode(params)
)
try:
with urllib.request.urlopen(url, timeout=180) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"Census ACS request failed for ZCTA geography: HTTP {exc.code}{body[:300]}"
) from exc
try:
data = json.loads(body)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Census ACS returned non-JSON for ZCTA geography: {body[:300]}"
) from exc
header = data[0]
rows = []
for values in data[1:]:
raw = dict(zip(header, values))
row = {
"geoid": raw["zip code tabulation area"],
"acs_name": raw["NAME"],
}
for acs_var, column in ACS_VARIABLES.items():
row[column] = clean_acs_value(raw.get(acs_var), column)
add_primary_industry(row)
rows.append(row)
return rows
def clean_acs_value(value, column):
if value in (None, "", "null") or value in SPECIAL_VALUES:
return None
if column in COUNT_COLUMNS:
return int(Decimal(value))
if column in NUMERIC_COLUMNS:
return Decimal(value)
return value
def add_primary_industry(row):
industry_total = row.get("industry_total_workers")
best_column = None
best_value = None
for column in INDUSTRY_COLUMNS:
value = row.get(column)
if value is None:
continue
if best_value is None or value > best_value:
best_column = column
best_value = value
row["primary_industry"] = INDUSTRY_COLUMNS.get(best_column)
row["primary_industry_workers"] = best_value
if industry_total and best_value is not None:
row["primary_industry_pct"] = Decimal(best_value * 100) / Decimal(industry_total)
else:
row["primary_industry_pct"] = None
def fetch_acs():
rows = fetch_acs_national()
fieldnames = [
"geoid",
"acs_name",
*ACS_VARIABLES.values(),
"primary_industry",
"primary_industry_workers",
"primary_industry_pct",
]
with ACS_AUDIT_CSV.open("w", newline="", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return rows, fieldnames
def load_acs_stage(conn, rows, fieldnames):
with conn:
with conn.cursor() as cur:
cur.execute(f"drop table if exists {ACS_STAGE_TABLE}")
cur.execute(
f"""
create table {ACS_STAGE_TABLE} (
geoid text primary key,
acs_name text,
population integer,
median_age numeric,
households integer,
avg_household_size numeric,
high_school_or_higher_pct numeric,
bachelor_or_higher_pct numeric,
broadband_subscription_pct numeric,
population_16_over integer,
labor_force integer,
unemployed integer,
unemployment_rate numeric,
industry_total_workers integer,
industry_agriculture_mining_workers integer,
industry_construction_workers integer,
industry_manufacturing_workers integer,
industry_wholesale_trade_workers integer,
industry_retail_trade_workers integer,
industry_transportation_warehousing_utilities_workers integer,
industry_information_workers integer,
industry_finance_real_estate_workers integer,
industry_professional_management_admin_workers integer,
industry_education_health_social_workers integer,
industry_arts_entertainment_food_workers integer,
industry_other_services_workers integer,
industry_public_administration_workers integer,
median_household_income integer,
per_capita_income integer,
family_poverty_rate numeric,
poverty_rate numeric,
hispanic_latino_population integer,
hispanic_latino_pct numeric,
non_hispanic_white_population integer,
non_hispanic_white_pct numeric,
non_hispanic_black_population integer,
non_hispanic_black_pct numeric,
non_hispanic_asian_population integer,
non_hispanic_asian_pct numeric,
primary_industry text,
primary_industry_workers integer,
primary_industry_pct numeric
)
"""
)
values = [tuple(row.get(column) for column in fieldnames) for row in rows]
execute_values(
cur,
f"insert into {ACS_STAGE_TABLE} ({', '.join(fieldnames)}) values %s",
values,
page_size=1000,
)
cur.execute(f"analyze {ACS_STAGE_TABLE}")
def create_final_table(conn):
with conn:
with conn.cursor() as cur:
cur.execute("drop index if exists _dc_zcta_boundaries_2020_geom_gix")
cur.execute(
f"create index _dc_zcta_boundaries_2020_geom_gix on {BOUNDARY_STAGE_TABLE} using gist (geom)"
)
cur.execute(f"analyze {BOUNDARY_STAGE_TABLE}")
cur.execute(
f"""
create table {FINAL_TABLE} as
with dc_zctas as (
select
t.zcta5ce20 as geoid,
count(*)::integer as data_center_count,
count(*) filter (where dc.source = 'curated')::integer
as curated_only_data_center_count,
count(*) filter (where dc.source = 'merged')::integer
as merged_data_center_count,
count(*) filter (where dc.source = 'osm')::integer
as osm_only_data_center_count,
array_agg(dc.{POINT_ID_COL} order by dc.{POINT_ID_COL}) as data_center_ids,
array_agg(distinct dc.operator) filter (where dc.operator is not null)
as operators
from {BOUNDARY_STAGE_TABLE} t
join {POINT_TABLE} dc
on t.geom && dc.geom
and ST_Covers(t.geom, dc.geom)
group by t.zcta5ce20
)
select
t.zcta5ce20 as geoid,
t.name20 as zcta_name,
t.aland20::bigint as land_area_sqm,
t.awater20::bigint as water_area_sqm,
{ACS_YEAR}::integer as acs_year,
'{ACS_SOURCE}'::text as acs_source,
'{ZCTA_BOUNDARY_VINTAGE}'::text as boundary_source,
a.acs_name,
d.data_center_count,
d.curated_only_data_center_count,
d.merged_data_center_count,
d.osm_only_data_center_count,
d.data_center_ids,
d.operators,
a.population,
a.median_age,
a.households,
a.avg_household_size,
a.high_school_or_higher_pct,
a.bachelor_or_higher_pct,
a.broadband_subscription_pct,
a.population_16_over,
a.labor_force,
a.unemployed,
a.unemployment_rate,
a.median_household_income,
a.per_capita_income,
a.family_poverty_rate,
a.poverty_rate,
a.hispanic_latino_population,
a.hispanic_latino_pct,
a.non_hispanic_white_population,
a.non_hispanic_white_pct,
a.non_hispanic_black_population,
a.non_hispanic_black_pct,
a.non_hispanic_asian_population,
a.non_hispanic_asian_pct,
a.industry_total_workers,
a.industry_agriculture_mining_workers,
a.industry_construction_workers,
a.industry_manufacturing_workers,
a.industry_wholesale_trade_workers,
a.industry_retail_trade_workers,
a.industry_transportation_warehousing_utilities_workers,
a.industry_information_workers,
a.industry_finance_real_estate_workers,
a.industry_professional_management_admin_workers,
a.industry_education_health_social_workers,
a.industry_arts_entertainment_food_workers,
a.industry_other_services_workers,
a.industry_public_administration_workers,
a.primary_industry,
a.primary_industry_workers,
a.primary_industry_pct,
t.geom::geometry(MultiPolygon, 4326) as geom
from {BOUNDARY_STAGE_TABLE} t
join dc_zctas d on d.geoid = t.zcta5ce20
left join {ACS_STAGE_TABLE} a on a.geoid = t.zcta5ce20
"""
)
cur.execute(f"alter table {FINAL_TABLE} add primary key (geoid)")
cur.execute(
f"create index data_center_zcta_2024_geom_gix on {FINAL_TABLE} using gist (geom)"
)
cur.execute(
f"create index data_center_zcta_2024_dc_count_idx on {FINAL_TABLE} (data_center_count desc)"
)
cur.execute(
f"""
comment on table {FINAL_TABLE} is
'ZIP Code Tabulation Areas (2020 boundary vintage) containing records from public.master_data_centers (curated + OSM merged), enriched with ACS 2024 5-year profile demographics and derived primary industry fields. Companion table to data_center_census_tracts_2024 at coarser ZCTA geography.'
"""
)
cur.execute(f"analyze {FINAL_TABLE}")
def assign_point_zcta_geoids(conn):
with conn:
with conn.cursor() as cur:
cur.execute(
f"alter table {POINT_TABLE} add column if not exists {POINT_ZCTA_COL} text"
)
cur.execute(
f"""
update {POINT_TABLE} dc
set {POINT_ZCTA_COL} = matched.geoid
from (
select
dc_inner.{POINT_ID_COL} as point_id,
(
select t.zcta5ce20
from {BOUNDARY_STAGE_TABLE} t
where t.geom && dc_inner.geom
and st_covers(t.geom, dc_inner.geom)
order by t.zcta5ce20
limit 1
) as geoid
from {POINT_TABLE} dc_inner
) matched
where dc.{POINT_ID_COL} = matched.point_id
"""
)
cur.execute(
f"create index if not exists master_data_centers_zcta_geoid_idx on {POINT_TABLE} ({POINT_ZCTA_COL})"
)
cur.execute(f"analyze {POINT_TABLE}")
def validate(conn):
with conn.cursor() as cur:
cur.execute(
f"""
select
count(*)::integer as zcta_rows,
coalesce(sum(data_center_count), 0)::integer as assigned_data_centers,
count(*) filter (where geom is not null)::integer as geom_rows
from {FINAL_TABLE}
"""
)
summary = cur.fetchone()
cur.execute(f"select count(*)::integer from {POINT_TABLE}")
total_points = cur.fetchone()[0]
cur.execute(
f"""
select count(*)::integer
from {POINT_TABLE}
where {POINT_ZCTA_COL} is null
"""
)
unassigned_points = cur.fetchone()[0]
cur.execute(
f"""
select count(*)::integer
from {FINAL_TABLE}
where population is null
"""
)
missing_acs = cur.fetchone()[0]
return summary, total_points, unassigned_points, missing_acs
def main():
parser = argparse.ArgumentParser(
description="Build ZCTA-level enrichment table for data-center points."
)
parser.add_argument(
"--replace-final",
action="store_true",
help="Drop and rebuild the final ZCTA table if it already exists.",
)
args = parser.parse_args()
conn = connect()
try:
if args.replace_final:
drop_final_table_if_exists(conn)
else:
ensure_final_table_absent(conn)
finally:
conn.close()
download_zcta_boundaries()
import_zcta_boundaries()
acs_rows, acs_fieldnames = fetch_acs()
conn = connect()
try:
if args.replace_final:
drop_final_table_if_exists(conn)
else:
ensure_final_table_absent(conn)
load_acs_stage(conn, acs_rows, acs_fieldnames)
create_final_table(conn)
assign_point_zcta_geoids(conn)
summary, total_points, unassigned_points, missing_acs = validate(conn)
finally:
conn.close()
print(f"loaded {len(acs_rows)} ACS ZCTA rows into {ACS_STAGE_TABLE}")
print(f"created {FINAL_TABLE}")
print(
"zcta_rows={0} assigned_data_centers={1} geom_rows={2} source_points={3}".format(
summary[0], summary[1], summary[2], total_points
)
)
print(f"points_unassigned_to_zcta={unassigned_points}")
print(f"zctas_missing_acs_population={missing_acs}")
if __name__ == "__main__":
main()