#!/usr/bin/env python3 import argparse import csv import json import os import subprocess import urllib.parse import urllib.request from decimal import Decimal from pathlib import Path import psycopg2 from psycopg2.extras import execute_values DB_NAME = "data_centers" POINT_TABLE = "public.us_dc_sample_geocoded" BOUNDARY_STAGE_TABLE = "public._dc_census_tract_boundaries_2024" ACS_STAGE_TABLE = "public._dc_census_tract_acs_2024" FINAL_TABLE = "public.data_center_census_tracts_2024" ACS_YEAR = 2024 ACS_SOURCE = "ACS 2024 5-year profile" TRACT_ZIP = Path("cb_2024_us_tract_500k.zip") TRACT_ZIP_URL = ( "https://www2.census.gov/geo/tiger/GENZ2024/shp/cb_2024_us_tract_500k.zip" ) ACS_AUDIT_CSV = Path("census_tract_acs_2024_selected_states.csv") STATE_FIPS = { "AL": "01", "AK": "02", "AZ": "04", "AR": "05", "CA": "06", "CO": "08", "CT": "09", "DE": "10", "DC": "11", "FL": "12", "GA": "13", "HI": "15", "ID": "16", "IL": "17", "IN": "18", "IA": "19", "KS": "20", "KY": "21", "LA": "22", "ME": "23", "MD": "24", "MA": "25", "MI": "26", "MN": "27", "MS": "28", "MO": "29", "MT": "30", "NE": "31", "NV": "32", "NH": "33", "NJ": "34", "NM": "35", "NY": "36", "NC": "37", "ND": "38", "OH": "39", "OK": "40", "OR": "41", "PA": "42", "RI": "44", "SC": "45", "SD": "46", "TN": "47", "TX": "48", "UT": "49", "VT": "50", "VA": "51", "WA": "53", "WV": "54", "WI": "55", "WY": "56", "AS": "60", "GU": "66", "MP": "69", "PR": "72", "VI": "78", } ACS_VARIABLES = { "DP05_0001E": "population", "DP05_0018E": "median_age", "DP02_0001E": "households", "DP02_0016E": "avg_household_size", "DP02_0067PE": "high_school_or_higher_pct", "DP02_0068PE": "bachelor_or_higher_pct", "DP02_0154PE": "broadband_subscription_pct", "DP03_0001E": "population_16_over", "DP03_0002E": "labor_force", "DP03_0005E": "unemployed", "DP03_0009PE": "unemployment_rate", "DP03_0032E": "industry_total_workers", "DP03_0033E": "industry_agriculture_mining_workers", "DP03_0034E": "industry_construction_workers", "DP03_0035E": "industry_manufacturing_workers", "DP03_0036E": "industry_wholesale_trade_workers", "DP03_0037E": "industry_retail_trade_workers", "DP03_0038E": "industry_transportation_warehousing_utilities_workers", "DP03_0039E": "industry_information_workers", "DP03_0040E": "industry_finance_real_estate_workers", "DP03_0041E": "industry_professional_management_admin_workers", "DP03_0042E": "industry_education_health_social_workers", "DP03_0043E": "industry_arts_entertainment_food_workers", "DP03_0044E": "industry_other_services_workers", "DP03_0045E": "industry_public_administration_workers", "DP03_0062E": "median_household_income", "DP03_0088E": "per_capita_income", "DP03_0119PE": "family_poverty_rate", "DP03_0128PE": "poverty_rate", "DP05_0090E": "hispanic_latino_population", "DP05_0090PE": "hispanic_latino_pct", "DP05_0096E": "non_hispanic_white_population", "DP05_0096PE": "non_hispanic_white_pct", "DP05_0097E": "non_hispanic_black_population", "DP05_0097PE": "non_hispanic_black_pct", "DP05_0099E": "non_hispanic_asian_population", "DP05_0099PE": "non_hispanic_asian_pct", } COUNT_COLUMNS = { "population", "households", "population_16_over", "labor_force", "unemployed", "industry_total_workers", "industry_agriculture_mining_workers", "industry_construction_workers", "industry_manufacturing_workers", "industry_wholesale_trade_workers", "industry_retail_trade_workers", "industry_transportation_warehousing_utilities_workers", "industry_information_workers", "industry_finance_real_estate_workers", "industry_professional_management_admin_workers", "industry_education_health_social_workers", "industry_arts_entertainment_food_workers", "industry_other_services_workers", "industry_public_administration_workers", "median_household_income", "per_capita_income", "hispanic_latino_population", "non_hispanic_white_population", "non_hispanic_black_population", "non_hispanic_asian_population", } NUMERIC_COLUMNS = set(ACS_VARIABLES.values()) - COUNT_COLUMNS INDUSTRY_COLUMNS = { "industry_agriculture_mining_workers": "Agriculture, forestry, fishing and hunting, and mining", "industry_construction_workers": "Construction", "industry_manufacturing_workers": "Manufacturing", "industry_wholesale_trade_workers": "Wholesale trade", "industry_retail_trade_workers": "Retail trade", "industry_transportation_warehousing_utilities_workers": "Transportation and warehousing, and utilities", "industry_information_workers": "Information", "industry_finance_real_estate_workers": "Finance and insurance, and real estate and rental and leasing", "industry_professional_management_admin_workers": "Professional, scientific, management, administrative, and waste management services", "industry_education_health_social_workers": "Educational services, and health care and social assistance", "industry_arts_entertainment_food_workers": "Arts, entertainment, recreation, accommodation, and food services", "industry_other_services_workers": "Other services, except public administration", "industry_public_administration_workers": "Public administration", } SPECIAL_VALUES = { "-666666666", "-888888888", "-999999999", "-222222222", "-333333333", "-555555555", "-666666666.0", "-888888888.0", "-999999999.0", "-222222222.0", "-333333333.0", "-555555555.0", } def connect(): return psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) def get_state_fips(conn): with conn.cursor() as cur: cur.execute( f"select distinct state_code from {POINT_TABLE} order by state_code" ) state_codes = [row[0] for row in cur.fetchall()] missing = [code for code in state_codes if code not in STATE_FIPS] if missing: raise RuntimeError(f"Missing state FIPS mappings for: {', '.join(missing)}") return [STATE_FIPS[code] for code in state_codes] def ensure_final_table_absent(conn): with conn.cursor() as cur: cur.execute("select to_regclass(%s)", (FINAL_TABLE,)) if cur.fetchone()[0] is not None: raise RuntimeError( f"Target table {FINAL_TABLE} already exists; refusing to overwrite it." ) def drop_final_table_if_exists(conn): with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {FINAL_TABLE}") def download_tract_boundaries(): if TRACT_ZIP.exists() and TRACT_ZIP.stat().st_size > 50_000_000: return tmp_path = TRACT_ZIP.with_suffix(".zip.part") with urllib.request.urlopen(TRACT_ZIP_URL, timeout=120) as response: with tmp_path.open("wb") as out: while True: chunk = response.read(1024 * 1024) if not chunk: break out.write(chunk) tmp_path.rename(TRACT_ZIP) def import_tract_boundaries(state_fips): where = "STATEFP IN ({})".format( ",".join(f"'{state}'" for state in sorted(state_fips)) ) env = os.environ.copy() env.update( { "PGHOST": os.environ["PGWEB_HOST"], "PGPORT": os.environ["PGWEB_PORT"], "PGUSER": os.environ["PGWEB_USER"], "PGPASSWORD": os.environ["PGWEB_PASSWORD"], "PGDATABASE": DB_NAME, } ) cmd = [ "ogr2ogr", "-f", "PostgreSQL", "PG:dbname=data_centers", f"/vsizip/{TRACT_ZIP.resolve()}/cb_2024_us_tract_500k.shp", "-nln", BOUNDARY_STAGE_TABLE, "-overwrite", "-nlt", "MULTIPOLYGON", "-t_srs", "EPSG:4326", "-lco", "GEOMETRY_NAME=geom", "-lco", "FID=gid", "-where", where, ] subprocess.run(cmd, check=True, env=env) def fetch_acs_for_state(state_fips): variables = ["NAME", *ACS_VARIABLES.keys()] params = { "get": ",".join(variables), "for": "tract:*", "in": f"state:{state_fips} county:*", } api_key = os.environ.get("CENSUS_API_KEY") if api_key: params["key"] = api_key url = ( f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?" + urllib.parse.urlencode(params) ) with urllib.request.urlopen(url, timeout=120) as response: data = json.loads(response.read().decode("utf-8")) header = data[0] rows = [] for values in data[1:]: raw = dict(zip(header, values)) row = { "geoid": raw["state"] + raw["county"] + raw["tract"], "acs_name": raw["NAME"], "statefp": raw["state"], "countyfp": raw["county"], "tractce": raw["tract"], } for acs_var, column in ACS_VARIABLES.items(): row[column] = clean_acs_value(raw.get(acs_var), column) add_primary_industry(row) rows.append(row) return rows def clean_acs_value(value, column): if value in (None, "", "null") or value in SPECIAL_VALUES: return None if column in COUNT_COLUMNS: return int(Decimal(value)) if column in NUMERIC_COLUMNS: return Decimal(value) return value def add_primary_industry(row): industry_total = row.get("industry_total_workers") best_column = None best_value = None for column in INDUSTRY_COLUMNS: value = row.get(column) if value is None: continue if best_value is None or value > best_value: best_column = column best_value = value row["primary_industry"] = INDUSTRY_COLUMNS.get(best_column) row["primary_industry_workers"] = best_value if industry_total and best_value is not None: row["primary_industry_pct"] = Decimal(best_value * 100) / Decimal(industry_total) else: row["primary_industry_pct"] = None def fetch_acs(state_fips): rows = [] for state in state_fips: rows.extend(fetch_acs_for_state(state)) fieldnames = [ "geoid", "acs_name", "statefp", "countyfp", "tractce", *ACS_VARIABLES.values(), "primary_industry", "primary_industry_workers", "primary_industry_pct", ] with ACS_AUDIT_CSV.open("w", newline="", encoding="utf-8") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return rows, fieldnames def load_acs_stage(conn, rows, fieldnames): with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {ACS_STAGE_TABLE}") cur.execute( f""" create table {ACS_STAGE_TABLE} ( geoid text primary key, acs_name text, statefp text, countyfp text, tractce text, population integer, median_age numeric, households integer, avg_household_size numeric, high_school_or_higher_pct numeric, bachelor_or_higher_pct numeric, broadband_subscription_pct numeric, population_16_over integer, labor_force integer, unemployed integer, unemployment_rate numeric, industry_total_workers integer, industry_agriculture_mining_workers integer, industry_construction_workers integer, industry_manufacturing_workers integer, industry_wholesale_trade_workers integer, industry_retail_trade_workers integer, industry_transportation_warehousing_utilities_workers integer, industry_information_workers integer, industry_finance_real_estate_workers integer, industry_professional_management_admin_workers integer, industry_education_health_social_workers integer, industry_arts_entertainment_food_workers integer, industry_other_services_workers integer, industry_public_administration_workers integer, median_household_income integer, per_capita_income integer, family_poverty_rate numeric, poverty_rate numeric, hispanic_latino_population integer, hispanic_latino_pct numeric, non_hispanic_white_population integer, non_hispanic_white_pct numeric, non_hispanic_black_population integer, non_hispanic_black_pct numeric, non_hispanic_asian_population integer, non_hispanic_asian_pct numeric, primary_industry text, primary_industry_workers integer, primary_industry_pct numeric ) """ ) values = [tuple(row.get(column) for column in fieldnames) for row in rows] execute_values( cur, f"insert into {ACS_STAGE_TABLE} ({', '.join(fieldnames)}) values %s", values, page_size=1000, ) cur.execute(f"analyze {ACS_STAGE_TABLE}") def create_final_table(conn): with conn: with conn.cursor() as cur: cur.execute("drop index if exists _dc_census_tract_boundaries_2024_geom_gix") cur.execute( f"create index _dc_census_tract_boundaries_2024_geom_gix on {BOUNDARY_STAGE_TABLE} using gist (geom)" ) cur.execute(f"analyze {BOUNDARY_STAGE_TABLE}") cur.execute( f""" create table {FINAL_TABLE} as with dc_tracts as ( select t.geoid, count(*)::integer as data_center_count, count(*) filter (where dc.geocode_precision = 'address_range')::integer as address_range_data_center_count, count(*) filter (where dc.geocode_precision = 'city')::integer as city_precision_data_center_count, array_agg(dc.id order by dc.id) as data_center_ids, array_agg(distinct dc.provider order by dc.provider) as providers from {BOUNDARY_STAGE_TABLE} t join {POINT_TABLE} dc on t.geom && dc.geom and ST_Covers(t.geom, dc.geom) group by t.geoid ) select t.geoid, t.statefp, t.countyfp, t.tractce, t.name as tract_name, t.namelsad, t.aland::bigint as land_area_sqm, t.awater::bigint as water_area_sqm, {ACS_YEAR}::integer as acs_year, '{ACS_SOURCE}'::text as acs_source, a.acs_name, d.data_center_count, d.address_range_data_center_count, d.city_precision_data_center_count, d.data_center_ids, d.providers, a.population, a.median_age, a.households, a.avg_household_size, a.high_school_or_higher_pct, a.bachelor_or_higher_pct, a.broadband_subscription_pct, a.population_16_over, a.labor_force, a.unemployed, a.unemployment_rate, a.median_household_income, a.per_capita_income, a.family_poverty_rate, a.poverty_rate, a.hispanic_latino_population, a.hispanic_latino_pct, a.non_hispanic_white_population, a.non_hispanic_white_pct, a.non_hispanic_black_population, a.non_hispanic_black_pct, a.non_hispanic_asian_population, a.non_hispanic_asian_pct, a.industry_total_workers, a.industry_agriculture_mining_workers, a.industry_construction_workers, a.industry_manufacturing_workers, a.industry_wholesale_trade_workers, a.industry_retail_trade_workers, a.industry_transportation_warehousing_utilities_workers, a.industry_information_workers, a.industry_finance_real_estate_workers, a.industry_professional_management_admin_workers, a.industry_education_health_social_workers, a.industry_arts_entertainment_food_workers, a.industry_other_services_workers, a.industry_public_administration_workers, a.primary_industry, a.primary_industry_workers, a.primary_industry_pct, t.geom::geometry(MultiPolygon, 4326) as geom from {BOUNDARY_STAGE_TABLE} t join dc_tracts d on d.geoid = t.geoid left join {ACS_STAGE_TABLE} a on a.geoid = t.geoid """ ) cur.execute(f"alter table {FINAL_TABLE} add primary key (geoid)") cur.execute( f"create index data_center_census_tracts_2024_geom_gix on {FINAL_TABLE} using gist (geom)" ) cur.execute( f"create index data_center_census_tracts_2024_state_county_idx on {FINAL_TABLE} (statefp, countyfp)" ) cur.execute( f"create index data_center_census_tracts_2024_dc_count_idx on {FINAL_TABLE} (data_center_count desc)" ) cur.execute( f""" comment on table {FINAL_TABLE} is 'Census tracts containing records from public.us_dc_sample_geocoded, enriched with ACS 2024 5-year profile demographics and derived primary industry fields.' """ ) cur.execute(f"analyze {FINAL_TABLE}") def assign_point_geoids(conn): with conn: with conn.cursor() as cur: cur.execute( f"alter table {POINT_TABLE} add column if not exists geoid text" ) cur.execute( f""" update {POINT_TABLE} dc set geoid = matched.geoid from ( select dc_inner.id, ( select t.geoid from {BOUNDARY_STAGE_TABLE} t where t.geom && dc_inner.geom and st_covers(t.geom, dc_inner.geom) order by t.geoid limit 1 ) as geoid from {POINT_TABLE} dc_inner ) matched where dc.id = matched.id """ ) cur.execute( f"create index if not exists us_dc_sample_geocoded_geoid_idx on {POINT_TABLE} (geoid)" ) cur.execute(f"analyze {POINT_TABLE}") def validate(conn): with conn.cursor() as cur: cur.execute( f""" select count(*)::integer as tract_rows, coalesce(sum(data_center_count), 0)::integer as assigned_data_centers, count(*) filter (where geom is not null)::integer as geom_rows from {FINAL_TABLE} """ ) summary = cur.fetchone() cur.execute(f"select count(*)::integer from {POINT_TABLE}") total_points = cur.fetchone()[0] cur.execute( f""" select geocode_precision, count(*)::integer from {POINT_TABLE} group by geocode_precision order by geocode_precision """ ) point_precision = cur.fetchall() cur.execute( f""" select count(*)::integer from {FINAL_TABLE} where population is null """ ) missing_acs = cur.fetchone()[0] return summary, total_points, point_precision, missing_acs def main(): parser = argparse.ArgumentParser( description="Build census-tract enrichment table for data-center points." ) parser.add_argument( "--replace-final", action="store_true", help="Drop and rebuild the final tract table if it already exists.", ) args = parser.parse_args() conn = connect() try: state_fips = get_state_fips(conn) if args.replace_final: drop_final_table_if_exists(conn) else: ensure_final_table_absent(conn) finally: conn.close() download_tract_boundaries() import_tract_boundaries(state_fips) acs_rows, acs_fieldnames = fetch_acs(state_fips) conn = connect() try: if args.replace_final: drop_final_table_if_exists(conn) else: ensure_final_table_absent(conn) load_acs_stage(conn, acs_rows, acs_fieldnames) create_final_table(conn) assign_point_geoids(conn) summary, total_points, point_precision, missing_acs = validate(conn) finally: conn.close() print(f"loaded {len(acs_rows)} ACS tract rows into {ACS_STAGE_TABLE}") print(f"created {FINAL_TABLE}") print( "tract_rows={0} assigned_data_centers={1} geom_rows={2} source_points={3}".format( summary[0], summary[1], summary[2], total_points ) ) print("point_precision=" + ", ".join(f"{k}:{v}" for k, v in point_precision)) print(f"tracts_missing_acs_population={missing_acs}") if __name__ == "__main__": main()