#!/usr/bin/env python3 """ Ingest EIA energy infrastructure data via EIA API into PostGIS with GEOID linking. Usage: ingest_eia_energy_layers.py [--category electric|power|gas|all] [--max-records N] [--skip-ingest] [--list-only] Features: - Queries EIA API for energy infrastructure datasets - Filters by category (electric grid, power plants, gas infrastructure) - Imports GeoJSON features to standardized PostGIS table names - Creates source catalog, tract-level GEOID linkage, and summary tables - Skips unavailable or invalid datasets gracefully Environment: - EIA_API_KEY: Required. Your EIA API key from https://www.eia.gov/opendata/ - PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD: PostGIS connection """ import argparse import hashlib import json import os import re import sys from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Dict, Any import psycopg2 import requests DB_NAME = "data_centers" TRACT_TABLE = "public.data_center_census_tracts_2024" LINK_TABLE = "public.energy_atlas_tract_link" SUMMARY_TABLE = "public.energy_atlas_tract_summary" # EIA API configuration EIA_API_BASE = "https://api.eia.gov/v2" EIA_API_KEY = os.environ.get("EIA_API_KEY") if not EIA_API_KEY: print("ERROR: EIA_API_KEY environment variable not set") sys.exit(1) SERVICE_URL_RE = re.compile(r"https?://[^\s\"']+", re.IGNORECASE) SAFE_RE = re.compile(r"[^a-z0-9_]+") # EIA dataset categories mapped to infrastructure types EIA_DATASETS = { "electric": { "category": "electric_grid", "endpoints": [ "electricity/electric-power-operational-data", "electricity/rto/region-data", ], }, "power": { "category": "power_plants", "endpoints": [ "electricity/operating-generator-capacity", "electricity/facility-fuel", ], }, "gas": { "category": "gas_infrastructure", "endpoints": [ "natural-gas/move/ist", "natural-gas/stor/sum", "petroleum/stoc", ], }, } # EIA endpoints that support lat/lon data fields for plant-level geocoding EIA_GEOCODABLE_ENDPOINTS = { "electricity/operating-generator-capacity": ["latitude", "longitude"], "electricity/facility-fuel": ["latitude", "longitude"], } # Endpoints that do not reliably support retry with ad-hoc data[] field requests. EIA_NO_RETRY_EXTRA_FIELDS = { "electricity/facility-fuel", } # US state abbreviation to FIPS code mapping for state-level GEOID linking STATE_FIPS = { "AL": "01", "AK": "02", "AZ": "04", "AR": "05", "CA": "06", "CO": "08", "CT": "09", "DE": "10", "DC": "11", "FL": "12", "GA": "13", "HI": "15", "ID": "16", "IL": "17", "IN": "18", "IA": "19", "KS": "20", "KY": "21", "LA": "22", "ME": "23", "MD": "24", "MA": "25", "MI": "26", "MN": "27", "MS": "28", "MO": "29", "MT": "30", "NE": "31", "NV": "32", "NH": "33", "NJ": "34", "NM": "35", "NY": "36", "NC": "37", "ND": "38", "OH": "39", "OK": "40", "OR": "41", "PA": "42", "RI": "44", "SC": "45", "SD": "46", "TN": "47", "TX": "48", "UT": "49", "VT": "50", "VA": "51", "WA": "53", "WV": "54", "WI": "55", "WY": "56", "PR": "72", } @dataclass class EIADataset: dataset_id: str name: str category: str api_endpoint: str description: str = "" source_url: str = "" def slugify(value: str) -> str: """Convert string to SQL-safe identifier.""" cleaned = SAFE_RE.sub("_", value.lower()).strip("_") cleaned = re.sub(r"_+", "_", cleaned) return cleaned or "layer" def standardize_table_name(dataset_id: str) -> str: """Generate standardized table name from dataset ID.""" base = f"energy_eia_{slugify(dataset_id)}" if len(base) <= 55: return base digest = hashlib.md5(base.encode("utf-8")).hexdigest()[:8] return f"{base[:46]}_{digest}" def query_eia_api(endpoint: str, params: Optional[Dict[str, Any]] = None, extra_data_fields: Optional[List[str]] = None) -> Optional[Dict]: """Query EIA API endpoint.""" # EIA API uses /data/ suffix for data queries if not endpoint.endswith("/data"): endpoint = f"{endpoint}/data" url = f"{EIA_API_BASE}/{endpoint}/" req_params = {"api_key": EIA_API_KEY, "length": 5000} if params: req_params.update(params) # Add extra data fields (e.g., latitude, longitude) using EIA's array syntax if extra_data_fields: for i, field in enumerate(extra_data_fields): req_params[f"data[{i}]"] = field try: resp = requests.get(url, params=req_params, timeout=(10, 20)) resp.raise_for_status() return resp.json() except requests.RequestException as e: print(f" api error on {endpoint}: {e}") return None def fetch_eia_records( endpoint: str, max_records: int = 0, extra_data_fields: Optional[List[str]] = None, ) -> Optional[List[Dict[str, Any]]]: """Fetch EIA records with pagination; retry without extra fields on unsupported endpoints.""" page_size = 5000 offset = 0 records: List[Dict[str, Any]] = [] used_extra_fields = extra_data_fields previous_first_row: Optional[str] = None while True: params = {"offset": offset, "length": page_size} data = query_eia_api(endpoint, params=params, extra_data_fields=used_extra_fields) # Some endpoints return 400 when requesting unsupported fields (e.g. lat/lon). if ( data is None and used_extra_fields and endpoint not in EIA_NO_RETRY_EXTRA_FIELDS ): print(f" retrying {endpoint} without extra data fields") used_extra_fields = None data = query_eia_api(endpoint, params=params, extra_data_fields=None) if not data: return None if offset == 0 else records response = data.get("response", {}) page_records = response.get("data", []) if not page_records: return records # Some EIA endpoints ignore offset and repeat page 1 forever. # Detect repeated first row signature and stop pagination. first_row_sig = json.dumps(page_records[0], sort_keys=True, default=str) if previous_first_row is not None and first_row_sig == previous_first_row: return records previous_first_row = first_row_sig records.extend(page_records) total = response.get("total") try: total_int = int(total) if total is not None else None except (TypeError, ValueError): total_int = None if max_records > 0 and len(records) >= max_records: return records[:max_records] if total_int is not None and len(records) >= total_int: return records if len(page_records) < page_size: return records offset += len(page_records) def get_eia_datasets(category: str = "all") -> List[EIADataset]: """Discover EIA datasets by category.""" datasets = [] cats = [category] if category != "all" else list(EIA_DATASETS.keys()) for cat in cats: if cat not in EIA_DATASETS: continue cat_info = EIA_DATASETS[cat] for endpoint in cat_info.get("endpoints", []): dataset_id = endpoint.replace("/", "_") datasets.append( EIADataset( dataset_id=dataset_id, name=endpoint.split("/")[-1], category=cat_info["category"], api_endpoint=endpoint, source_url=f"{EIA_API_BASE}/{endpoint}", ) ) return datasets def parse_seed_line(line: str) -> Optional[EIADataset]: """Parse stub - not used with EIA API.""" return None def read_seed_file(seed_path: Path) -> List[EIADataset]: """Stub - datasets discovered via EIA API catalog.""" return [] def classify_layer(text: str) -> Optional[str]: """Stub - classification handled by get_eia_datasets().""" return None def import_layer_to_postgis(dataset: EIADataset, table_name: str, max_records: int = 0) -> bool: """Import EIA dataset to PostGIS table.""" conn = connect_db() try: # Check if this endpoint supports lat/lon geocoding extra_fields = EIA_GEOCODABLE_ENDPOINTS.get(dataset.api_endpoint) # Query EIA API for data (with pagination), requesting lat/lon when supported. records = fetch_eia_records( dataset.api_endpoint, max_records=max_records, extra_data_fields=extra_fields, ) if not records: print(f" no data returned") return False # Create target table only once data is confirmed. with conn: with conn.cursor() as cur: cur.execute( f""" CREATE TABLE IF NOT EXISTS public.{table_name} ( gid SERIAL PRIMARY KEY, geom GEOMETRY(GEOMETRY, 4326), properties JSONB ) """ ) # Truncate to avoid duplicates on re-runs cur.execute(f"TRUNCATE TABLE public.{table_name}") # Insert records into PostGIS using psycopg2. with conn: with conn.cursor() as cur: count = 0 geo_count = 0 for record in records: try: props_json = json.dumps(record) # Try to extract lat/lon for geometry lat = record.get("latitude") or record.get("lat") lon = record.get("longitude") or record.get("lon") try: lat = float(lat) if lat is not None else None lon = float(lon) if lon is not None else None except (TypeError, ValueError): lat = lon = None if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180: cur.execute( f""" INSERT INTO public.{table_name} (geom, properties) VALUES (ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s) """, (lon, lat, props_json), ) geo_count += 1 else: cur.execute( f""" INSERT INTO public.{table_name} (properties) VALUES (%s) """, (props_json,), ) count += 1 except Exception as e: print(f" row insert error: {e}") continue geo_msg = f", {geo_count} with geometry" if extra_fields else "" print(f" inserted {count} features into {table_name}{geo_msg}") return count > 0 except Exception as e: print(f" error: {e}") return False finally: try: conn.close() except: pass def connect_db(): """Connect to PostGIS database.""" return psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) def ensure_source_catalog_table(conn): """Create source catalog table if it doesn't exist.""" with conn: with conn.cursor() as cur: cur.execute( """ create table if not exists public.energy_atlas_layers_catalog ( table_name text primary key, source_item_id text, source_type text, source_title text, source_owner text, source_url text, category text, imported_at timestamptz default now() ) """ ) def upsert_layer_catalog(conn, table_name: str, dataset: EIADataset, category: str): """Upsert layer metadata into source catalog.""" with conn: with conn.cursor() as cur: cur.execute( """ insert into public.energy_atlas_layers_catalog ( table_name, source_item_id, source_type, source_title, source_owner, source_url, category, imported_at ) values (%s,%s,%s,%s,%s,%s,%s,now()) on conflict (table_name) do update set source_item_id = excluded.source_item_id, source_type = excluded.source_type, source_title = excluded.source_title, source_owner = excluded.source_owner, source_url = excluded.source_url, category = excluded.category, imported_at = now() """, ( table_name, dataset.dataset_id, "EIA API", dataset.name, "U.S. Energy Information Administration", dataset.source_url, category, ), ) def add_geom_index_and_analyze(conn, table_name: str): """Create spatial index and analyze table.""" with conn: with conn.cursor() as cur: cur.execute( f"create index if not exists {table_name}_geom_gix on public.{table_name} using gist (geom)" ) cur.execute(f"analyze public.{table_name}") def table_geom_class(conn, table_name: str) -> Optional[str]: """Get geometry type of table.""" with conn.cursor() as cur: cur.execute( f""" select st_geometrytype(geom) from public.{table_name} where geom is not null limit 1 """ ) row = cur.fetchone() if not row: return None return (row[0] or "").lower() def reset_link_tables(conn): """Recreate GEOID linkage tables.""" with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {LINK_TABLE}") cur.execute( f""" create table {LINK_TABLE} ( geoid text not null, source_table text not null, category text not null, feature_count integer, intersect_length_m numeric, intersect_area_sqm numeric, unique (geoid, source_table) ) """ ) def link_one_table(conn, table_name: str, category: str): """Link infrastructure features to census tracts via spatial join or state FIPS fallback.""" gclass = table_geom_class(conn, table_name) # Check if table has a properties column (EIA API tables do; legacy tables may not) with conn.cursor() as cur: cur.execute( "SELECT count(*) FROM information_schema.columns " "WHERE table_schema='public' AND table_name=%s AND column_name='properties'", (table_name,), ) has_properties = cur.fetchone()[0] > 0 # Check if table has any rows with stateid for state-level fallback stateid_count = 0 if has_properties: with conn.cursor() as cur: cur.execute( f"SELECT count(*) FROM public.{table_name} WHERE geom IS NULL AND properties->>'stateid' IS NOT NULL" ) stateid_count = cur.fetchone()[0] if gclass: # Spatial join for rows that have geometry with conn: with conn.cursor() as cur: if "point" in gclass: cur.execute( f""" INSERT INTO {LINK_TABLE} ( geoid, source_table, category, feature_count, intersect_length_m, intersect_area_sqm ) SELECT t.geoid, %s, %s, count(*)::integer, null, null FROM {TRACT_TABLE} t JOIN public.{table_name} s ON t.geom && s.geom AND st_covers(t.geom, s.geom) GROUP BY t.geoid """, (table_name, category), ) elif "line" in gclass: cur.execute( f""" INSERT INTO {LINK_TABLE} ( geoid, source_table, category, feature_count, intersect_length_m, intersect_area_sqm ) SELECT t.geoid, %s, %s, count(*)::integer, sum(st_length(st_intersection(t.geom, s.geom)::geography)), null FROM {TRACT_TABLE} t JOIN public.{table_name} s ON t.geom && s.geom AND st_intersects(t.geom, s.geom) GROUP BY t.geoid """, (table_name, category), ) elif "polygon" in gclass: cur.execute( f""" INSERT INTO {LINK_TABLE} ( geoid, source_table, category, feature_count, intersect_length_m, intersect_area_sqm ) SELECT t.geoid, %s, %s, count(*)::integer, null, sum(st_area(st_intersection(t.geom, s.geom)::geography)) FROM {TRACT_TABLE} t JOIN public.{table_name} s ON t.geom && s.geom AND st_intersects(t.geom, s.geom) GROUP BY t.geoid """, (table_name, category), ) # State-level GEOID fallback: link rows without geometry using stateid → state FIPS prefix if stateid_count > 0: link_table_by_state(conn, table_name, category) def link_table_by_state(conn, table_name: str, category: str): """Link EIA records to census tracts via state FIPS code prefix on GEOID. EIA data has stateid (2-letter abbreviation). We map to FIPS and match all census tracts whose GEOID starts with that state FIPS code. Each state gets one link row with the count of records for that state. """ # Build a VALUES list for the state FIPS mapping fips_values = ", ".join(f"('{abbr}', '{fips}')" for abbr, fips in STATE_FIPS.items()) with conn: with conn.cursor() as cur: cur.execute( f""" INSERT INTO {LINK_TABLE} ( geoid, source_table, category, feature_count, intersect_length_m, intersect_area_sqm ) SELECT t.geoid, %s, %s, state_counts.record_count::integer, null, null FROM ( SELECT sf.fips AS state_fips, count(*) AS record_count FROM public.{table_name} s JOIN (VALUES {fips_values}) AS sf(abbr, fips) ON upper(s.properties->>'stateid') = sf.abbr WHERE s.geom IS NULL GROUP BY sf.fips ) state_counts JOIN {TRACT_TABLE} t ON left(t.geoid, 2) = state_counts.state_fips ON CONFLICT DO NOTHING """, (table_name, category), ) rows = cur.rowcount if rows > 0: print(f" state-level geocoding linked {rows} tract rows for {table_name}") def build_summary_table(conn): """Create GEOID summary aggregating all categories.""" with conn: with conn.cursor() as cur: cur.execute(f"drop table if exists {SUMMARY_TABLE}") cur.execute( f""" create table {SUMMARY_TABLE} as select geoid, sum(feature_count) filter (where category = 'electric_grid')::integer as electric_grid_feature_count, sum(coalesce(intersect_length_m, 0)) filter (where category = 'electric_grid') as electric_grid_length_m, sum(coalesce(intersect_area_sqm, 0)) filter (where category = 'electric_grid') as electric_grid_area_sqm, sum(feature_count) filter (where category = 'power_plants')::integer as power_plant_feature_count, sum(feature_count) filter (where category = 'gas_infrastructure')::integer as gas_infrastructure_feature_count, sum(coalesce(intersect_length_m, 0)) filter (where category = 'gas_infrastructure') as gas_infrastructure_length_m, sum(coalesce(intersect_area_sqm, 0)) filter (where category = 'gas_infrastructure') as gas_infrastructure_area_sqm from {LINK_TABLE} group by geoid """ ) cur.execute(f"alter table {SUMMARY_TABLE} add primary key (geoid)") cur.execute( f"create index energy_atlas_tract_link_geoid_idx on {LINK_TABLE} (geoid)" ) cur.execute( f"create index energy_atlas_tract_link_category_idx on {LINK_TABLE} (category)" ) cur.execute(f"analyze {LINK_TABLE}") cur.execute(f"analyze {SUMMARY_TABLE}") def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( description=( "Ingest EIA energy infrastructure data via EIA API into PostGIS with GEOID linking." ) ) parser.add_argument( "--category", choices=["electric", "power", "gas", "all"], default="all", help="Infrastructure category to ingest.", ) parser.add_argument( "--max-records", type=int, default=0, help="Cap on API records to process per dataset (0=all).", ) parser.add_argument( "--skip-ingest", action="store_true", help="Skip import; rebuild GEOID links/summary only.", ) parser.add_argument( "--list-only", action="store_true", help="List selected datasets and exit.", ) return parser.parse_args() def main(): """Main ingestion pipeline.""" args = parse_args() if not EIA_API_KEY: print("ERROR: EIA_API_KEY not set. Export it and try again.") sys.exit(1) # Discover EIA datasets by category datasets = get_eia_datasets(args.category) if not datasets: raise RuntimeError(f"No datasets found for category '{args.category}'") if args.max_records > 0: print(f"limiting to {args.max_records} records per dataset") # Build ingest list with table names datasets_to_ingest = [] for dataset in datasets: table_name = standardize_table_name(dataset.dataset_id) datasets_to_ingest.append((dataset, table_name, dataset.category)) if args.list_only: print(f"selected_datasets={len(datasets_to_ingest)}") for dataset, table_name, category in datasets_to_ingest: print(f"{category}\tpublic.{table_name}\t{dataset.name}\t{dataset.source_url}") return if not datasets_to_ingest: raise RuntimeError("No datasets selected. Try --category all or --list-only.") conn = connect_db() try: ensure_source_catalog_table(conn) if not args.skip_ingest: for dataset, table_name, category in datasets_to_ingest: try: print(f"importing {dataset.name} -> public.{table_name} [{category}]") success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records) if success: upsert_layer_catalog(conn, table_name, dataset, category) add_geom_index_and_analyze(conn, table_name) except Exception as e: print(f" warning: import failed ({type(e).__name__}); skipping") continue # Rebuild GEOID links from catalog. with conn.cursor() as cur: cur.execute( """ select table_name, category from public.energy_atlas_layers_catalog order by table_name """ ) catalog_rows = cur.fetchall() reset_link_tables(conn) for table_name, category in catalog_rows: print(f"linking public.{table_name} -> GEOID ({category})") link_one_table(conn, table_name, category) build_summary_table(conn) with conn.cursor() as cur: cur.execute("select count(*) from public.energy_atlas_layers_catalog") catalog_count = cur.fetchone()[0] cur.execute(f"select count(*) from {LINK_TABLE}") link_count = cur.fetchone()[0] cur.execute(f"select count(*) from {SUMMARY_TABLE}") summary_count = cur.fetchone()[0] print( f"\ndone: catalog_layers={catalog_count}, " f"geoid_links={link_count}, geoid_summary_rows={summary_count}" ) finally: conn.close() if __name__ == "__main__": main()