Add master data center merge workflow

This commit is contained in:
2026-05-17 18:53:16 -07:00
parent 8fcbb18e37
commit 90e8b21423
10 changed files with 11892 additions and 9599 deletions

View File

@@ -14,7 +14,8 @@ from psycopg2.extras import execute_values
DB_NAME = "data_centers"
POINT_TABLE = "public.us_dc_sample_geocoded"
POINT_TABLE = "public.master_data_centers"
POINT_ID_COL = "master_id"
BOUNDARY_STAGE_TABLE = "public._dc_census_tract_boundaries_2024"
ACS_STAGE_TABLE = "public._dc_census_tract_acs_2024"
FINAL_TABLE = "public.data_center_census_tracts_2024"
@@ -27,6 +28,25 @@ TRACT_ZIP_URL = (
)
ACS_AUDIT_CSV = Path("census_tract_acs_2024_selected_states.csv")
STATE_NAME_TO_CODE = {
"Alabama": "AL", "Alaska": "AK", "Arizona": "AZ", "Arkansas": "AR",
"California": "CA", "Colorado": "CO", "Connecticut": "CT", "Delaware": "DE",
"District of Columbia": "DC", "Florida": "FL", "Georgia": "GA", "Hawaii": "HI",
"Idaho": "ID", "Illinois": "IL", "Indiana": "IN", "Iowa": "IA",
"Kansas": "KS", "Kentucky": "KY", "Louisiana": "LA", "Maine": "ME",
"Maryland": "MD", "Massachusetts": "MA", "Michigan": "MI", "Minnesota": "MN",
"Mississippi": "MS", "Missouri": "MO", "Montana": "MT", "Nebraska": "NE",
"Nevada": "NV", "New Hampshire": "NH", "New Jersey": "NJ", "New Mexico": "NM",
"New York": "NY", "North Carolina": "NC", "North Dakota": "ND", "Ohio": "OH",
"Oklahoma": "OK", "Oregon": "OR", "Pennsylvania": "PA", "Rhode Island": "RI",
"South Carolina": "SC", "South Dakota": "SD", "Tennessee": "TN", "Texas": "TX",
"Utah": "UT", "Vermont": "VT", "Virginia": "VA", "Washington": "WA",
"West Virginia": "WV", "Wisconsin": "WI", "Wyoming": "WY",
"American Samoa": "AS", "Guam": "GU", "Northern Mariana Islands": "MP",
"Puerto Rico": "PR", "United States Virgin Islands": "VI",
"U.S. Virgin Islands": "VI", "Virgin Islands": "VI",
}
STATE_FIPS = {
"AL": "01",
"AK": "02",
@@ -198,16 +218,45 @@ def connect():
)
def normalize_state(value):
if value in (None, ""):
return None
if value in STATE_FIPS:
return value
return STATE_NAME_TO_CODE.get(value.strip())
def get_state_fips(conn):
with conn.cursor() as cur:
cur.execute(
f"select distinct state_code from {POINT_TABLE} order by state_code"
f"select state, count(*) from {POINT_TABLE} group by state order by state nulls last"
)
state_codes = [row[0] for row in cur.fetchall()]
missing = [code for code in state_codes if code not in STATE_FIPS]
if missing:
raise RuntimeError(f"Missing state FIPS mappings for: {', '.join(missing)}")
return [STATE_FIPS[code] for code in state_codes]
rows = cur.fetchall()
normalized_counts = {}
null_state_count = 0
unknown = []
for raw, count in rows:
if raw is None:
null_state_count += count
continue
code = normalize_state(raw)
if code is None:
unknown.append((raw, count))
continue
normalized_counts[code] = normalized_counts.get(code, 0) + count
if unknown:
details = ", ".join(f"{repr(name)}({n})" for name, n in unknown)
raise RuntimeError(f"Unrecognized state values in {POINT_TABLE}: {details}")
if null_state_count:
print(
f"warning: {null_state_count} master_data_centers rows have NULL state; "
f"importing tract boundaries for all 50 states + DC + PR so spatial join can resolve them."
)
# Census ACS 5-year DP profile lacks coverage for the small island territories;
# restrict to the 50 states + DC + PR which the ACS profile reliably serves.
allowed = {"AS", "GU", "MP", "VI"}
return sorted({fips for code, fips in STATE_FIPS.items() if code not in allowed})
return sorted({STATE_FIPS[code] for code in normalized_counts})
def ensure_final_table_absent(conn):
@@ -290,8 +339,20 @@ def fetch_acs_for_state(state_fips):
f"https://api.census.gov/data/{ACS_YEAR}/acs/acs5/profile?"
+ urllib.parse.urlencode(params)
)
with urllib.request.urlopen(url, timeout=120) as response:
data = json.loads(response.read().decode("utf-8"))
try:
with urllib.request.urlopen(url, timeout=120) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"Census ACS request failed for state {state_fips}: HTTP {exc.code}{body[:300]}"
) from exc
try:
data = json.loads(body)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Census ACS returned non-JSON for state {state_fips}: {body[:300]}"
) from exc
header = data[0]
rows = []
@@ -444,12 +505,15 @@ def create_final_table(conn):
select
t.geoid,
count(*)::integer as data_center_count,
count(*) filter (where dc.geocode_precision = 'address_range')::integer
as address_range_data_center_count,
count(*) filter (where dc.geocode_precision = 'city')::integer
as city_precision_data_center_count,
array_agg(dc.id order by dc.id) as data_center_ids,
array_agg(distinct dc.provider order by dc.provider) as providers
count(*) filter (where dc.source = 'curated')::integer
as curated_only_data_center_count,
count(*) filter (where dc.source = 'merged')::integer
as merged_data_center_count,
count(*) filter (where dc.source = 'osm')::integer
as osm_only_data_center_count,
array_agg(dc.{POINT_ID_COL} order by dc.{POINT_ID_COL}) as data_center_ids,
array_agg(distinct dc.operator) filter (where dc.operator is not null)
as operators
from {BOUNDARY_STAGE_TABLE} t
join {POINT_TABLE} dc
on t.geom && dc.geom
@@ -469,10 +533,11 @@ def create_final_table(conn):
'{ACS_SOURCE}'::text as acs_source,
a.acs_name,
d.data_center_count,
d.address_range_data_center_count,
d.city_precision_data_center_count,
d.curated_only_data_center_count,
d.merged_data_center_count,
d.osm_only_data_center_count,
d.data_center_ids,
d.providers,
d.operators,
a.population,
a.median_age,
a.households,
@@ -532,7 +597,7 @@ def create_final_table(conn):
cur.execute(
f"""
comment on table {FINAL_TABLE} is
'Census tracts containing records from public.us_dc_sample_geocoded, enriched with ACS 2024 5-year profile demographics and derived primary industry fields.'
'Census tracts containing records from public.master_data_centers (curated + OSM merged), enriched with ACS 2024 5-year profile demographics and derived primary industry fields.'
"""
)
cur.execute(f"analyze {FINAL_TABLE}")
@@ -550,7 +615,7 @@ def assign_point_geoids(conn):
set geoid = matched.geoid
from (
select
dc_inner.id,
dc_inner.{POINT_ID_COL} as point_id,
(
select t.geoid
from {BOUNDARY_STAGE_TABLE} t
@@ -561,11 +626,11 @@ def assign_point_geoids(conn):
) as geoid
from {POINT_TABLE} dc_inner
) matched
where dc.id = matched.id
where dc.{POINT_ID_COL} = matched.point_id
"""
)
cur.execute(
f"create index if not exists us_dc_sample_geocoded_geoid_idx on {POINT_TABLE} (geoid)"
f"create index if not exists master_data_centers_geoid_idx on {POINT_TABLE} (geoid)"
)
cur.execute(f"analyze {POINT_TABLE}")
@@ -586,13 +651,21 @@ def validate(conn):
total_points = cur.fetchone()[0]
cur.execute(
f"""
select geocode_precision, count(*)::integer
select source, count(*)::integer
from {POINT_TABLE}
group by geocode_precision
order by geocode_precision
group by source
order by source
"""
)
point_precision = cur.fetchall()
point_source_breakdown = cur.fetchall()
cur.execute(
f"""
select count(*)::integer
from {POINT_TABLE}
where geoid is null
"""
)
unassigned_points = cur.fetchone()[0]
cur.execute(
f"""
select count(*)::integer
@@ -601,7 +674,7 @@ def validate(conn):
"""
)
missing_acs = cur.fetchone()[0]
return summary, total_points, point_precision, missing_acs
return summary, total_points, point_source_breakdown, unassigned_points, missing_acs
def main():
@@ -638,7 +711,7 @@ def main():
load_acs_stage(conn, acs_rows, acs_fieldnames)
create_final_table(conn)
assign_point_geoids(conn)
summary, total_points, point_precision, missing_acs = validate(conn)
summary, total_points, point_source_breakdown, unassigned_points, missing_acs = validate(conn)
finally:
conn.close()
@@ -649,7 +722,8 @@ def main():
summary[0], summary[1], summary[2], total_points
)
)
print("point_precision=" + ", ".join(f"{k}:{v}" for k, v in point_precision))
print("point_source=" + ", ".join(f"{k}:{v}" for k, v in point_source_breakdown))
print(f"points_unassigned_to_tract={unassigned_points}")
print(f"tracts_missing_acs_population={missing_acs}")