Files
data-centers/scripts/create_data_center_census_tract_table.py
dadams ee5856661a Reorganize project into scripts/, docs/, data/, output/ directories
Move all Python scripts to scripts/, documentation to docs/, raw input
data to data/, and generated HTML/CSV outputs to output/. Update path
references in 8 scripts to use Path(__file__).parent.parent as project
root so they work correctly from the new location. Update README links
and quick-start commands accordingly. Notebooks remain at root.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 21:57:47 -07:00

732 lines
26 KiB
Python

#!/usr/bin/env python3
import argparse
import csv
import json
import os
import subprocess
import urllib.parse
import urllib.request
from decimal import Decimal
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_values
DB_NAME = "data_centers"
POINT_TABLE = "public.master_data_centers"
POINT_ID_COL = "master_id"
BOUNDARY_STAGE_TABLE = "public._dc_census_tract_boundaries_2024"
ACS_STAGE_TABLE = "public._dc_census_tract_acs_2024"
FINAL_TABLE = "public.data_center_census_tracts_2024"
ACS_YEAR = 2024
ACS_SOURCE = "ACS 2024 5-year profile"
TRACT_ZIP = Path("cb_2024_us_tract_500k.zip")
TRACT_ZIP_URL = (
"https://www2.census.gov/geo/tiger/GENZ2024/shp/cb_2024_us_tract_500k.zip"
)
ACS_AUDIT_CSV = Path("census_tract_acs_2024_selected_states.csv")
STATE_NAME_TO_CODE = {
"Alabama": "AL", "Alaska": "AK", "Arizona": "AZ", "Arkansas": "AR",
"California": "CA", "Colorado": "CO", "Connecticut": "CT", "Delaware": "DE",
"District of Columbia": "DC", "Florida": "FL", "Georgia": "GA", "Hawaii": "HI",
"Idaho": "ID", "Illinois": "IL", "Indiana": "IN", "Iowa": "IA",
"Kansas": "KS", "Kentucky": "KY", "Louisiana": "LA", "Maine": "ME",
"Maryland": "MD", "Massachusetts": "MA", "Michigan": "MI", "Minnesota": "MN",
"Mississippi": "MS", "Missouri": "MO", "Montana": "MT", "Nebraska": "NE",
"Nevada": "NV", "New Hampshire": "NH", "New Jersey": "NJ", "New Mexico": "NM",
"New York": "NY", "North Carolina": "NC", "North Dakota": "ND", "Ohio": "OH",
"Oklahoma": "OK", "Oregon": "OR", "Pennsylvania": "PA", "Rhode Island": "RI",
"South Carolina": "SC", "South Dakota": "SD", "Tennessee": "TN", "Texas": "TX",
"Utah": "UT", "Vermont": "VT", "Virginia": "VA", "Washington": "WA",
"West Virginia": "WV", "Wisconsin": "WI", "Wyoming": "WY",
"American Samoa": "AS", "Guam": "GU", "Northern Mariana Islands": "MP",
"Puerto Rico": "PR", "United States Virgin Islands": "VI",
"U.S. Virgin Islands": "VI", "Virgin Islands": "VI",
}
STATE_FIPS = {
"AL": "01",
"AK": "02",
"AZ": "04",
"AR": "05",
"CA": "06",
"CO": "08",
"CT": "09",
"DE": "10",
"DC": "11",
"FL": "12",
"GA": "13",
"HI": "15",
"ID": "16",
"IL": "17",
"IN": "18",
"IA": "19",
"KS": "20",
"KY": "21",
"LA": "22",
"ME": "23",
"MD": "24",
"MA": "25",
"MI": "26",
"MN": "27",
"MS": "28",
"MO": "29",
"MT": "30",
"NE": "31",
"NV": "32",
"NH": "33",
"NJ": "34",
"NM": "35",
"NY": "36",
"NC": "37",
"ND": "38",
"OH": "39",
"OK": "40",
"OR": "41",
"PA": "42",
"RI": "44",
"SC": "45",
"SD": "46",
"TN": "47",
"TX": "48",
"UT": "49",
"VT": "50",
"VA": "51",
"WA": "53",
"WV": "54",
"WI": "55",
"WY": "56",
"AS": "60",
"GU": "66",
"MP": "69",
"PR": "72",
"VI": "78",
}
ACS_VARIABLES = {
"DP05_0001E": "population",
"DP05_0018E": "median_age",
"DP02_0001E": "households",
"DP02_0016E": "avg_household_size",
"DP02_0067PE": "high_school_or_higher_pct",
"DP02_0068PE": "bachelor_or_higher_pct",
"DP02_0154PE": "broadband_subscription_pct",
"DP03_0001E": "population_16_over",
"DP03_0002E": "labor_force",
"DP03_0005E": "unemployed",
"DP03_0009PE": "unemployment_rate",
"DP03_0032E": "industry_total_workers",
"DP03_0033E": "industry_agriculture_mining_workers",
"DP03_0034E": "industry_construction_workers",
"DP03_0035E": "industry_manufacturing_workers",
"DP03_0036E": "industry_wholesale_trade_workers",
"DP03_0037E": "industry_retail_trade_workers",
"DP03_0038E": "industry_transportation_warehousing_utilities_workers",
"DP03_0039E": "industry_information_workers",
"DP03_0040E": "industry_finance_real_estate_workers",
"DP03_0041E": "industry_professional_management_admin_workers",
"DP03_0042E": "industry_education_health_social_workers",
"DP03_0043E": "industry_arts_entertainment_food_workers",
"DP03_0044E": "industry_other_services_workers",
"DP03_0045E": "industry_public_administration_workers",
"DP03_0062E": "median_household_income",
"DP03_0088E": "per_capita_income",
"DP03_0119PE": "family_poverty_rate",
"DP03_0128PE": "poverty_rate",
"DP05_0090E": "hispanic_latino_population",
"DP05_0090PE": "hispanic_latino_pct",
"DP05_0096E": "non_hispanic_white_population",
"DP05_0096PE": "non_hispanic_white_pct",
"DP05_0097E": "non_hispanic_black_population",
"DP05_0097PE": "non_hispanic_black_pct",
"DP05_0099E": "non_hispanic_asian_population",
"DP05_0099PE": "non_hispanic_asian_pct",
}
COUNT_COLUMNS = {
"population",
"households",
"population_16_over",
"labor_force",
"unemployed",
"industry_total_workers",
"industry_agriculture_mining_workers",
"industry_construction_workers",
"industry_manufacturing_workers",
"industry_wholesale_trade_workers",
"industry_retail_trade_workers",
"industry_transportation_warehousing_utilities_workers",
"industry_information_workers",
"industry_finance_real_estate_workers",
"industry_professional_management_admin_workers",
"industry_education_health_social_workers",
"industry_arts_entertainment_food_workers",
"industry_other_services_workers",
"industry_public_administration_workers",
"median_household_income",
"per_capita_income",
"hispanic_latino_population",
"non_hispanic_white_population",
"non_hispanic_black_population",
"non_hispanic_asian_population",
}
NUMERIC_COLUMNS = set(ACS_VARIABLES.values()) - COUNT_COLUMNS
INDUSTRY_COLUMNS = {
"industry_agriculture_mining_workers": "Agriculture, forestry, fishing and hunting, and mining",
"industry_construction_workers": "Construction",
"industry_manufacturing_workers": "Manufacturing",
"industry_wholesale_trade_workers": "Wholesale trade",
"industry_retail_trade_workers": "Retail trade",
"industry_transportation_warehousing_utilities_workers": "Transportation and warehousing, and utilities",
"industry_information_workers": "Information",
"industry_finance_real_estate_workers": "Finance and insurance, and real estate and rental and leasing",
"industry_professional_management_admin_workers": "Professional, scientific, management, administrative, and waste management services",
"industry_education_health_social_workers": "Educational services, and health care and social assistance",
"industry_arts_entertainment_food_workers": "Arts, entertainment, recreation, accommodation, and food services",
"industry_other_services_workers": "Other services, except public administration",
"industry_public_administration_workers": "Public administration",
}
SPECIAL_VALUES = {
"-666666666",
"-888888888",
"-999999999",
"-222222222",
"-333333333",
"-555555555",
"-666666666.0",
"-888888888.0",
"-999999999.0",
"-222222222.0",
"-333333333.0",
"-555555555.0",
}
def connect():
return psycopg2.connect(
host=os.environ["PGWEB_HOST"],
port=os.environ["PGWEB_PORT"],
user=os.environ["PGWEB_USER"],
password=os.environ["PGWEB_PASSWORD"],
dbname=DB_NAME,
)
def normalize_state(value):
if value in (None, ""):
return None
if value in STATE_FIPS:
return value
return STATE_NAME_TO_CODE.get(value.strip())
def get_state_fips(conn):
with conn.cursor() as cur:
cur.execute(
f"select state, count(*) from {POINT_TABLE} group by state order by state nulls last"
)
rows = cur.fetchall()
normalized_counts = {}
null_state_count = 0
unknown = []
for raw, count in rows:
if raw is None:
null_state_count += count
continue
code = normalize_state(raw)
if code is None:
unknown.append((raw, count))
continue
normalized_counts[code] = normalized_counts.get(code, 0) + count
if unknown:
details = ", ".join(f"{repr(name)}({n})" for name, n in unknown)
raise RuntimeError(f"Unrecognized state values in {POINT_TABLE}: {details}")
if null_state_count:
print(
f"warning: {null_state_count} master_data_centers rows have NULL state; "
f"importing tract boundaries for all 50 states + DC + PR so spatial join can resolve them."
)
# Census ACS 5-year DP profile lacks coverage for the small island territories;
# restrict to the 50 states + DC + PR which the ACS profile reliably serves.
allowed = {"AS", "GU", "MP", "VI"}
return sorted({fips for code, fips in STATE_FIPS.items() if code not in allowed})
return sorted({STATE_FIPS[code] for code in normalized_counts})
def ensure_final_table_absent(conn):
with conn.cursor() as cur:
cur.execute("select to_regclass(%s)", (FINAL_TABLE,))
if cur.fetchone()[0] is not None:
raise RuntimeError(
f"Target table {FINAL_TABLE} already exists; refusing to overwrite it."
)
def drop_final_table_if_exists(conn):
with conn:
with conn.cursor() as cur:
cur.execute(f"drop table if exists {FINAL_TABLE}")
def download_tract_boundaries():
if TRACT_ZIP.exists() and TRACT_ZIP.stat().st_size > 50_000_000:
return
tmp_path = TRACT_ZIP.with_suffix(".zip.part")
with urllib.request.urlopen(TRACT_ZIP_URL, timeout=120) as response:
with tmp_path.open("wb") as out:
while True:
chunk = response.read(1024 * 1024)
if not chunk:
break
out.write(chunk)
tmp_path.rename(TRACT_ZIP)
def import_tract_boundaries(state_fips):
where = "STATEFP IN ({})".format(
",".join(f"'{state}'" for state in sorted(state_fips))
)
env = os.environ.copy()
env.update(
{
"PGHOST": os.environ["PGWEB_HOST"],
"PGPORT": os.environ["PGWEB_PORT"],
"PGUSER": os.environ["PGWEB_USER"],
"PGPASSWORD": os.environ["PGWEB_PASSWORD"],
"PGDATABASE": DB_NAME,
}
)
cmd = [
"ogr2ogr",
"-f",
"PostgreSQL",
"PG:dbname=data_centers",
f"/vsizip/{TRACT_ZIP.resolve()}/cb_2024_us_tract_500k.shp",
"-nln",
BOUNDARY_STAGE_TABLE,
"-overwrite",
"-nlt",
"MULTIPOLYGON",
"-t_srs",
"EPSG:4326",
"-lco",
"GEOMETRY_NAME=geom",
"-lco",
"FID=gid",
"-where",
where,
]
subprocess.run(cmd, check=True, env=env)
def fetch_acs_for_state(state_fips):
variables = ["NAME", *ACS_VARIABLES.keys()]
params = {
"get": ",".join(variables),
"for": "tract:*",
"in": f"state:{state_fips} county:*",
}
api_key = os.environ.get("CENSUS_API_KEY")
if api_key:
params["key"] = api_key
url = (
f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?"
+ urllib.parse.urlencode(params)
)
try:
with urllib.request.urlopen(url, timeout=120) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"Census ACS request failed for state {state_fips}: HTTP {exc.code}{body[:300]}"
) from exc
try:
data = json.loads(body)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Census ACS returned non-JSON for state {state_fips}: {body[:300]}"
) from exc
header = data[0]
rows = []
for values in data[1:]:
raw = dict(zip(header, values))
row = {
"geoid": raw["state"] + raw["county"] + raw["tract"],
"acs_name": raw["NAME"],
"statefp": raw["state"],
"countyfp": raw["county"],
"tractce": raw["tract"],
}
for acs_var, column in ACS_VARIABLES.items():
row[column] = clean_acs_value(raw.get(acs_var), column)
add_primary_industry(row)
rows.append(row)
return rows
def clean_acs_value(value, column):
if value in (None, "", "null") or value in SPECIAL_VALUES:
return None
if column in COUNT_COLUMNS:
return int(Decimal(value))
if column in NUMERIC_COLUMNS:
return Decimal(value)
return value
def add_primary_industry(row):
industry_total = row.get("industry_total_workers")
best_column = None
best_value = None
for column in INDUSTRY_COLUMNS:
value = row.get(column)
if value is None:
continue
if best_value is None or value > best_value:
best_column = column
best_value = value
row["primary_industry"] = INDUSTRY_COLUMNS.get(best_column)
row["primary_industry_workers"] = best_value
if industry_total and best_value is not None:
row["primary_industry_pct"] = Decimal(best_value * 100) / Decimal(industry_total)
else:
row["primary_industry_pct"] = None
def fetch_acs(state_fips):
rows = []
for state in state_fips:
rows.extend(fetch_acs_for_state(state))
fieldnames = [
"geoid",
"acs_name",
"statefp",
"countyfp",
"tractce",
*ACS_VARIABLES.values(),
"primary_industry",
"primary_industry_workers",
"primary_industry_pct",
]
with ACS_AUDIT_CSV.open("w", newline="", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return rows, fieldnames
def load_acs_stage(conn, rows, fieldnames):
with conn:
with conn.cursor() as cur:
cur.execute(f"drop table if exists {ACS_STAGE_TABLE}")
cur.execute(
f"""
create table {ACS_STAGE_TABLE} (
geoid text primary key,
acs_name text,
statefp text,
countyfp text,
tractce text,
population integer,
median_age numeric,
households integer,
avg_household_size numeric,
high_school_or_higher_pct numeric,
bachelor_or_higher_pct numeric,
broadband_subscription_pct numeric,
population_16_over integer,
labor_force integer,
unemployed integer,
unemployment_rate numeric,
industry_total_workers integer,
industry_agriculture_mining_workers integer,
industry_construction_workers integer,
industry_manufacturing_workers integer,
industry_wholesale_trade_workers integer,
industry_retail_trade_workers integer,
industry_transportation_warehousing_utilities_workers integer,
industry_information_workers integer,
industry_finance_real_estate_workers integer,
industry_professional_management_admin_workers integer,
industry_education_health_social_workers integer,
industry_arts_entertainment_food_workers integer,
industry_other_services_workers integer,
industry_public_administration_workers integer,
median_household_income integer,
per_capita_income integer,
family_poverty_rate numeric,
poverty_rate numeric,
hispanic_latino_population integer,
hispanic_latino_pct numeric,
non_hispanic_white_population integer,
non_hispanic_white_pct numeric,
non_hispanic_black_population integer,
non_hispanic_black_pct numeric,
non_hispanic_asian_population integer,
non_hispanic_asian_pct numeric,
primary_industry text,
primary_industry_workers integer,
primary_industry_pct numeric
)
"""
)
values = [tuple(row.get(column) for column in fieldnames) for row in rows]
execute_values(
cur,
f"insert into {ACS_STAGE_TABLE} ({', '.join(fieldnames)}) values %s",
values,
page_size=1000,
)
cur.execute(f"analyze {ACS_STAGE_TABLE}")
def create_final_table(conn):
with conn:
with conn.cursor() as cur:
cur.execute("drop index if exists _dc_census_tract_boundaries_2024_geom_gix")
cur.execute(
f"create index _dc_census_tract_boundaries_2024_geom_gix on {BOUNDARY_STAGE_TABLE} using gist (geom)"
)
cur.execute(f"analyze {BOUNDARY_STAGE_TABLE}")
cur.execute(
f"""
create table {FINAL_TABLE} as
with dc_tracts as (
select
t.geoid,
count(*)::integer as data_center_count,
count(*) filter (where dc.source = 'curated')::integer
as curated_only_data_center_count,
count(*) filter (where dc.source = 'merged')::integer
as merged_data_center_count,
count(*) filter (where dc.source = 'osm')::integer
as osm_only_data_center_count,
array_agg(dc.{POINT_ID_COL} order by dc.{POINT_ID_COL}) as data_center_ids,
array_agg(distinct dc.operator) filter (where dc.operator is not null)
as operators
from {BOUNDARY_STAGE_TABLE} t
join {POINT_TABLE} dc
on t.geom && dc.geom
and ST_Covers(t.geom, dc.geom)
group by t.geoid
)
select
t.geoid,
t.statefp,
t.countyfp,
t.tractce,
t.name as tract_name,
t.namelsad,
t.aland::bigint as land_area_sqm,
t.awater::bigint as water_area_sqm,
{ACS_YEAR}::integer as acs_year,
'{ACS_SOURCE}'::text as acs_source,
a.acs_name,
d.data_center_count,
d.curated_only_data_center_count,
d.merged_data_center_count,
d.osm_only_data_center_count,
d.data_center_ids,
d.operators,
a.population,
a.median_age,
a.households,
a.avg_household_size,
a.high_school_or_higher_pct,
a.bachelor_or_higher_pct,
a.broadband_subscription_pct,
a.population_16_over,
a.labor_force,
a.unemployed,
a.unemployment_rate,
a.median_household_income,
a.per_capita_income,
a.family_poverty_rate,
a.poverty_rate,
a.hispanic_latino_population,
a.hispanic_latino_pct,
a.non_hispanic_white_population,
a.non_hispanic_white_pct,
a.non_hispanic_black_population,
a.non_hispanic_black_pct,
a.non_hispanic_asian_population,
a.non_hispanic_asian_pct,
a.industry_total_workers,
a.industry_agriculture_mining_workers,
a.industry_construction_workers,
a.industry_manufacturing_workers,
a.industry_wholesale_trade_workers,
a.industry_retail_trade_workers,
a.industry_transportation_warehousing_utilities_workers,
a.industry_information_workers,
a.industry_finance_real_estate_workers,
a.industry_professional_management_admin_workers,
a.industry_education_health_social_workers,
a.industry_arts_entertainment_food_workers,
a.industry_other_services_workers,
a.industry_public_administration_workers,
a.primary_industry,
a.primary_industry_workers,
a.primary_industry_pct,
t.geom::geometry(MultiPolygon, 4326) as geom
from {BOUNDARY_STAGE_TABLE} t
join dc_tracts d on d.geoid = t.geoid
left join {ACS_STAGE_TABLE} a on a.geoid = t.geoid
"""
)
cur.execute(f"alter table {FINAL_TABLE} add primary key (geoid)")
cur.execute(
f"create index data_center_census_tracts_2024_geom_gix on {FINAL_TABLE} using gist (geom)"
)
cur.execute(
f"create index data_center_census_tracts_2024_state_county_idx on {FINAL_TABLE} (statefp, countyfp)"
)
cur.execute(
f"create index data_center_census_tracts_2024_dc_count_idx on {FINAL_TABLE} (data_center_count desc)"
)
cur.execute(
f"""
comment on table {FINAL_TABLE} is
'Census tracts containing records from public.master_data_centers (curated + OSM merged), enriched with ACS 2024 5-year profile demographics and derived primary industry fields.'
"""
)
cur.execute(f"analyze {FINAL_TABLE}")
def assign_point_geoids(conn):
with conn:
with conn.cursor() as cur:
cur.execute(
f"alter table {POINT_TABLE} add column if not exists geoid text"
)
cur.execute(
f"""
update {POINT_TABLE} dc
set geoid = matched.geoid
from (
select
dc_inner.{POINT_ID_COL} as point_id,
(
select t.geoid
from {BOUNDARY_STAGE_TABLE} t
where t.geom && dc_inner.geom
and st_covers(t.geom, dc_inner.geom)
order by t.geoid
limit 1
) as geoid
from {POINT_TABLE} dc_inner
) matched
where dc.{POINT_ID_COL} = matched.point_id
"""
)
cur.execute(
f"create index if not exists master_data_centers_geoid_idx on {POINT_TABLE} (geoid)"
)
cur.execute(f"analyze {POINT_TABLE}")
def validate(conn):
with conn.cursor() as cur:
cur.execute(
f"""
select
count(*)::integer as tract_rows,
coalesce(sum(data_center_count), 0)::integer as assigned_data_centers,
count(*) filter (where geom is not null)::integer as geom_rows
from {FINAL_TABLE}
"""
)
summary = cur.fetchone()
cur.execute(f"select count(*)::integer from {POINT_TABLE}")
total_points = cur.fetchone()[0]
cur.execute(
f"""
select source, count(*)::integer
from {POINT_TABLE}
group by source
order by source
"""
)
point_source_breakdown = cur.fetchall()
cur.execute(
f"""
select count(*)::integer
from {POINT_TABLE}
where geoid is null
"""
)
unassigned_points = cur.fetchone()[0]
cur.execute(
f"""
select count(*)::integer
from {FINAL_TABLE}
where population is null
"""
)
missing_acs = cur.fetchone()[0]
return summary, total_points, point_source_breakdown, unassigned_points, missing_acs
def main():
parser = argparse.ArgumentParser(
description="Build census-tract enrichment table for data-center points."
)
parser.add_argument(
"--replace-final",
action="store_true",
help="Drop and rebuild the final tract table if it already exists.",
)
args = parser.parse_args()
conn = connect()
try:
state_fips = get_state_fips(conn)
if args.replace_final:
drop_final_table_if_exists(conn)
else:
ensure_final_table_absent(conn)
finally:
conn.close()
download_tract_boundaries()
import_tract_boundaries(state_fips)
acs_rows, acs_fieldnames = fetch_acs(state_fips)
conn = connect()
try:
if args.replace_final:
drop_final_table_if_exists(conn)
else:
ensure_final_table_absent(conn)
load_acs_stage(conn, acs_rows, acs_fieldnames)
create_final_table(conn)
assign_point_geoids(conn)
summary, total_points, point_source_breakdown, unassigned_points, missing_acs = validate(conn)
finally:
conn.close()
print(f"loaded {len(acs_rows)} ACS tract rows into {ACS_STAGE_TABLE}")
print(f"created {FINAL_TABLE}")
print(
"tract_rows={0} assigned_data_centers={1} geom_rows={2} source_points={3}".format(
summary[0], summary[1], summary[2], total_points
)
)
print("point_source=" + ", ".join(f"{k}:{v}" for k, v in point_source_breakdown))
print(f"points_unassigned_to_tract={unassigned_points}")
print(f"tracts_missing_acs_population={missing_acs}")
if __name__ == "__main__":
main()