first commit
This commit is contained in:
760
ingest_eia_energy_layers.py
Normal file
760
ingest_eia_energy_layers.py
Normal file
@@ -0,0 +1,760 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Ingest EIA energy infrastructure data via EIA API into PostGIS with GEOID linking.
|
||||
|
||||
Usage:
|
||||
ingest_eia_energy_layers.py [--category electric|power|gas|all] [--max-records N] [--skip-ingest] [--list-only]
|
||||
|
||||
Features:
|
||||
- Queries EIA API for energy infrastructure datasets
|
||||
- Filters by category (electric grid, power plants, gas infrastructure)
|
||||
- Imports GeoJSON features to standardized PostGIS table names
|
||||
- Creates source catalog, tract-level GEOID linkage, and summary tables
|
||||
- Skips unavailable or invalid datasets gracefully
|
||||
|
||||
Environment:
|
||||
- EIA_API_KEY: Required. Your EIA API key from https://www.eia.gov/opendata/
|
||||
- PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD: PostGIS connection
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
import psycopg2
|
||||
import requests
|
||||
|
||||
|
||||
DB_NAME = "data_centers"
|
||||
TRACT_TABLE = "public.data_center_census_tracts_2024"
|
||||
LINK_TABLE = "public.energy_atlas_tract_link"
|
||||
SUMMARY_TABLE = "public.energy_atlas_tract_summary"
|
||||
|
||||
# EIA API configuration
|
||||
EIA_API_BASE = "https://api.eia.gov/v2"
|
||||
EIA_API_KEY = os.environ.get("EIA_API_KEY")
|
||||
if not EIA_API_KEY:
|
||||
print("ERROR: EIA_API_KEY environment variable not set")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
SERVICE_URL_RE = re.compile(r"https?://[^\s\"']+", re.IGNORECASE)
|
||||
SAFE_RE = re.compile(r"[^a-z0-9_]+")
|
||||
|
||||
# EIA dataset categories mapped to infrastructure types
|
||||
EIA_DATASETS = {
|
||||
"electric": {
|
||||
"category": "electric_grid",
|
||||
"endpoints": [
|
||||
"electricity/electric-power-operational-data",
|
||||
"electricity/rto/region-data",
|
||||
],
|
||||
},
|
||||
"power": {
|
||||
"category": "power_plants",
|
||||
"endpoints": [
|
||||
"electricity/operating-generator-capacity",
|
||||
"electricity/facility-fuel",
|
||||
],
|
||||
},
|
||||
"gas": {
|
||||
"category": "gas_infrastructure",
|
||||
"endpoints": [
|
||||
"natural-gas/move/ist",
|
||||
"natural-gas/stor/sum",
|
||||
"petroleum/stoc",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
# EIA endpoints that support lat/lon data fields for plant-level geocoding
|
||||
EIA_GEOCODABLE_ENDPOINTS = {
|
||||
"electricity/operating-generator-capacity": ["latitude", "longitude"],
|
||||
"electricity/facility-fuel": ["latitude", "longitude"],
|
||||
}
|
||||
|
||||
# US state abbreviation to FIPS code mapping for state-level GEOID linking
|
||||
STATE_FIPS = {
|
||||
"AL": "01", "AK": "02", "AZ": "04", "AR": "05", "CA": "06",
|
||||
"CO": "08", "CT": "09", "DE": "10", "DC": "11", "FL": "12",
|
||||
"GA": "13", "HI": "15", "ID": "16", "IL": "17", "IN": "18",
|
||||
"IA": "19", "KS": "20", "KY": "21", "LA": "22", "ME": "23",
|
||||
"MD": "24", "MA": "25", "MI": "26", "MN": "27", "MS": "28",
|
||||
"MO": "29", "MT": "30", "NE": "31", "NV": "32", "NH": "33",
|
||||
"NJ": "34", "NM": "35", "NY": "36", "NC": "37", "ND": "38",
|
||||
"OH": "39", "OK": "40", "OR": "41", "PA": "42", "RI": "44",
|
||||
"SC": "45", "SD": "46", "TN": "47", "TX": "48", "UT": "49",
|
||||
"VT": "50", "VA": "51", "WA": "53", "WV": "54", "WI": "55",
|
||||
"WY": "56", "PR": "72",
|
||||
}
|
||||
|
||||
@dataclass
|
||||
class EIADataset:
|
||||
dataset_id: str
|
||||
name: str
|
||||
category: str
|
||||
api_endpoint: str
|
||||
description: str = ""
|
||||
source_url: str = ""
|
||||
|
||||
|
||||
def slugify(value: str) -> str:
|
||||
"""Convert string to SQL-safe identifier."""
|
||||
cleaned = SAFE_RE.sub("_", value.lower()).strip("_")
|
||||
cleaned = re.sub(r"_+", "_", cleaned)
|
||||
return cleaned or "layer"
|
||||
|
||||
|
||||
def standardize_table_name(dataset_id: str) -> str:
|
||||
"""Generate standardized table name from dataset ID."""
|
||||
base = f"energy_eia_{slugify(dataset_id)}"
|
||||
if len(base) <= 55:
|
||||
return base
|
||||
digest = hashlib.md5(base.encode("utf-8")).hexdigest()[:8]
|
||||
return f"{base[:46]}_{digest}"
|
||||
|
||||
|
||||
def query_eia_api(endpoint: str, params: Optional[Dict[str, Any]] = None, extra_data_fields: Optional[List[str]] = None) -> Optional[Dict]:
|
||||
"""Query EIA API endpoint."""
|
||||
# EIA API uses /data/ suffix for data queries
|
||||
if not endpoint.endswith("/data"):
|
||||
endpoint = f"{endpoint}/data"
|
||||
|
||||
url = f"{EIA_API_BASE}/{endpoint}/"
|
||||
req_params = {"api_key": EIA_API_KEY, "length": 5000}
|
||||
if params:
|
||||
req_params.update(params)
|
||||
|
||||
# Add extra data fields (e.g., latitude, longitude) using EIA's array syntax
|
||||
if extra_data_fields:
|
||||
for i, field in enumerate(extra_data_fields):
|
||||
req_params[f"data[{i}]"] = field
|
||||
|
||||
try:
|
||||
resp = requests.get(url, params=req_params, timeout=60)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.RequestException as e:
|
||||
print(f" api error on {endpoint}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def fetch_eia_records(
|
||||
endpoint: str,
|
||||
max_records: int = 0,
|
||||
extra_data_fields: Optional[List[str]] = None,
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Fetch EIA records with pagination; retry without extra fields on unsupported endpoints."""
|
||||
page_size = 5000
|
||||
offset = 0
|
||||
records: List[Dict[str, Any]] = []
|
||||
used_extra_fields = extra_data_fields
|
||||
previous_first_row: Optional[str] = None
|
||||
|
||||
while True:
|
||||
params = {"offset": offset, "length": page_size}
|
||||
data = query_eia_api(endpoint, params=params, extra_data_fields=used_extra_fields)
|
||||
|
||||
# Some endpoints return 400 when requesting unsupported fields (e.g. lat/lon).
|
||||
if data is None and used_extra_fields:
|
||||
print(f" retrying {endpoint} without extra data fields")
|
||||
used_extra_fields = None
|
||||
data = query_eia_api(endpoint, params=params, extra_data_fields=None)
|
||||
|
||||
if not data:
|
||||
return None if offset == 0 else records
|
||||
|
||||
response = data.get("response", {})
|
||||
page_records = response.get("data", [])
|
||||
if not page_records:
|
||||
return records
|
||||
|
||||
# Some EIA endpoints ignore offset and repeat page 1 forever.
|
||||
# Detect repeated first row signature and stop pagination.
|
||||
first_row_sig = json.dumps(page_records[0], sort_keys=True, default=str)
|
||||
if previous_first_row is not None and first_row_sig == previous_first_row:
|
||||
return records
|
||||
previous_first_row = first_row_sig
|
||||
|
||||
records.extend(page_records)
|
||||
|
||||
total = response.get("total")
|
||||
try:
|
||||
total_int = int(total) if total is not None else None
|
||||
except (TypeError, ValueError):
|
||||
total_int = None
|
||||
|
||||
if max_records > 0 and len(records) >= max_records:
|
||||
return records[:max_records]
|
||||
|
||||
if total_int is not None and len(records) >= total_int:
|
||||
return records
|
||||
|
||||
if len(page_records) < page_size:
|
||||
return records
|
||||
|
||||
offset += len(page_records)
|
||||
|
||||
|
||||
def get_eia_datasets(category: str = "all") -> List[EIADataset]:
|
||||
"""Discover EIA datasets by category."""
|
||||
datasets = []
|
||||
cats = [category] if category != "all" else list(EIA_DATASETS.keys())
|
||||
|
||||
for cat in cats:
|
||||
if cat not in EIA_DATASETS:
|
||||
continue
|
||||
cat_info = EIA_DATASETS[cat]
|
||||
for endpoint in cat_info.get("endpoints", []):
|
||||
dataset_id = endpoint.replace("/", "_")
|
||||
datasets.append(
|
||||
EIADataset(
|
||||
dataset_id=dataset_id,
|
||||
name=endpoint.split("/")[-1],
|
||||
category=cat_info["category"],
|
||||
api_endpoint=endpoint,
|
||||
source_url=f"{EIA_API_BASE}/{endpoint}",
|
||||
)
|
||||
)
|
||||
|
||||
return datasets
|
||||
|
||||
|
||||
def parse_seed_line(line: str) -> Optional[EIADataset]:
|
||||
"""Parse stub - not used with EIA API."""
|
||||
return None
|
||||
|
||||
|
||||
def read_seed_file(seed_path: Path) -> List[EIADataset]:
|
||||
"""Stub - datasets discovered via EIA API catalog."""
|
||||
return []
|
||||
|
||||
|
||||
def classify_layer(text: str) -> Optional[str]:
|
||||
"""Stub - classification handled by get_eia_datasets()."""
|
||||
return None
|
||||
|
||||
|
||||
def import_layer_to_postgis(dataset: EIADataset, table_name: str, max_records: int = 0) -> bool:
|
||||
"""Import EIA dataset to PostGIS table."""
|
||||
conn = connect_db()
|
||||
try:
|
||||
# Check if this endpoint supports lat/lon geocoding
|
||||
extra_fields = EIA_GEOCODABLE_ENDPOINTS.get(dataset.api_endpoint)
|
||||
|
||||
# Query EIA API for data (with pagination), requesting lat/lon when supported.
|
||||
records = fetch_eia_records(
|
||||
dataset.api_endpoint,
|
||||
max_records=max_records,
|
||||
extra_data_fields=extra_fields,
|
||||
)
|
||||
if not records:
|
||||
print(f" no data returned")
|
||||
return False
|
||||
|
||||
# Create target table only once data is confirmed.
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS public.{table_name} (
|
||||
gid SERIAL PRIMARY KEY,
|
||||
geom GEOMETRY(GEOMETRY, 4326),
|
||||
properties JSONB
|
||||
)
|
||||
"""
|
||||
)
|
||||
# Truncate to avoid duplicates on re-runs
|
||||
cur.execute(f"TRUNCATE TABLE public.{table_name}")
|
||||
|
||||
# Insert records into PostGIS using psycopg2.
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
count = 0
|
||||
geo_count = 0
|
||||
for record in records:
|
||||
try:
|
||||
props_json = json.dumps(record)
|
||||
# Try to extract lat/lon for geometry
|
||||
lat = record.get("latitude") or record.get("lat")
|
||||
lon = record.get("longitude") or record.get("lon")
|
||||
try:
|
||||
lat = float(lat) if lat is not None else None
|
||||
lon = float(lon) if lon is not None else None
|
||||
except (TypeError, ValueError):
|
||||
lat = lon = None
|
||||
|
||||
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO public.{table_name} (geom, properties)
|
||||
VALUES (ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)
|
||||
""",
|
||||
(lon, lat, props_json),
|
||||
)
|
||||
geo_count += 1
|
||||
else:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO public.{table_name} (properties)
|
||||
VALUES (%s)
|
||||
""",
|
||||
(props_json,),
|
||||
)
|
||||
count += 1
|
||||
except Exception as e:
|
||||
print(f" row insert error: {e}")
|
||||
continue
|
||||
|
||||
geo_msg = f", {geo_count} with geometry" if extra_fields else ""
|
||||
print(f" inserted {count} features into {table_name}{geo_msg}")
|
||||
return count > 0
|
||||
except Exception as e:
|
||||
print(f" error: {e}")
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def connect_db():
|
||||
"""Connect to PostGIS database."""
|
||||
return psycopg2.connect(
|
||||
host=os.environ["PGWEB_HOST"],
|
||||
port=os.environ["PGWEB_PORT"],
|
||||
user=os.environ["PGWEB_USER"],
|
||||
password=os.environ["PGWEB_PASSWORD"],
|
||||
dbname=DB_NAME,
|
||||
)
|
||||
|
||||
|
||||
def ensure_source_catalog_table(conn):
|
||||
"""Create source catalog table if it doesn't exist."""
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
create table if not exists public.energy_atlas_layers_catalog (
|
||||
table_name text primary key,
|
||||
source_item_id text,
|
||||
source_type text,
|
||||
source_title text,
|
||||
source_owner text,
|
||||
source_url text,
|
||||
category text,
|
||||
imported_at timestamptz default now()
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def upsert_layer_catalog(conn, table_name: str, dataset: EIADataset, category: str):
|
||||
"""Upsert layer metadata into source catalog."""
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
insert into public.energy_atlas_layers_catalog (
|
||||
table_name,
|
||||
source_item_id,
|
||||
source_type,
|
||||
source_title,
|
||||
source_owner,
|
||||
source_url,
|
||||
category,
|
||||
imported_at
|
||||
)
|
||||
values (%s,%s,%s,%s,%s,%s,%s,now())
|
||||
on conflict (table_name) do update
|
||||
set source_item_id = excluded.source_item_id,
|
||||
source_type = excluded.source_type,
|
||||
source_title = excluded.source_title,
|
||||
source_owner = excluded.source_owner,
|
||||
source_url = excluded.source_url,
|
||||
category = excluded.category,
|
||||
imported_at = now()
|
||||
""",
|
||||
(
|
||||
table_name,
|
||||
dataset.dataset_id,
|
||||
"EIA API",
|
||||
dataset.name,
|
||||
"U.S. Energy Information Administration",
|
||||
dataset.source_url,
|
||||
category,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def add_geom_index_and_analyze(conn, table_name: str):
|
||||
"""Create spatial index and analyze table."""
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"create index if not exists {table_name}_geom_gix on public.{table_name} using gist (geom)"
|
||||
)
|
||||
cur.execute(f"analyze public.{table_name}")
|
||||
|
||||
|
||||
def table_geom_class(conn, table_name: str) -> Optional[str]:
|
||||
"""Get geometry type of table."""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
select st_geometrytype(geom)
|
||||
from public.{table_name}
|
||||
where geom is not null
|
||||
limit 1
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return (row[0] or "").lower()
|
||||
|
||||
|
||||
def reset_link_tables(conn):
|
||||
"""Recreate GEOID linkage tables."""
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f"drop table if exists {LINK_TABLE}")
|
||||
cur.execute(
|
||||
f"""
|
||||
create table {LINK_TABLE} (
|
||||
geoid text not null,
|
||||
source_table text not null,
|
||||
category text not null,
|
||||
feature_count integer,
|
||||
intersect_length_m numeric,
|
||||
intersect_area_sqm numeric,
|
||||
unique (geoid, source_table)
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def link_one_table(conn, table_name: str, category: str):
|
||||
"""Link infrastructure features to census tracts via spatial join or state FIPS fallback."""
|
||||
gclass = table_geom_class(conn, table_name)
|
||||
|
||||
# Check if table has a properties column (EIA API tables do; legacy tables may not)
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT count(*) FROM information_schema.columns "
|
||||
"WHERE table_schema='public' AND table_name=%s AND column_name='properties'",
|
||||
(table_name,),
|
||||
)
|
||||
has_properties = cur.fetchone()[0] > 0
|
||||
|
||||
# Check if table has any rows with stateid for state-level fallback
|
||||
stateid_count = 0
|
||||
if has_properties:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"SELECT count(*) FROM public.{table_name} WHERE geom IS NULL AND properties->>'stateid' IS NOT NULL"
|
||||
)
|
||||
stateid_count = cur.fetchone()[0]
|
||||
|
||||
if gclass:
|
||||
# Spatial join for rows that have geometry
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
if "point" in gclass:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO {LINK_TABLE} (
|
||||
geoid,
|
||||
source_table,
|
||||
category,
|
||||
feature_count,
|
||||
intersect_length_m,
|
||||
intersect_area_sqm
|
||||
)
|
||||
SELECT
|
||||
t.geoid,
|
||||
%s,
|
||||
%s,
|
||||
count(*)::integer,
|
||||
null,
|
||||
null
|
||||
FROM {TRACT_TABLE} t
|
||||
JOIN public.{table_name} s
|
||||
ON t.geom && s.geom
|
||||
AND st_covers(t.geom, s.geom)
|
||||
GROUP BY t.geoid
|
||||
""",
|
||||
(table_name, category),
|
||||
)
|
||||
elif "line" in gclass:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO {LINK_TABLE} (
|
||||
geoid,
|
||||
source_table,
|
||||
category,
|
||||
feature_count,
|
||||
intersect_length_m,
|
||||
intersect_area_sqm
|
||||
)
|
||||
SELECT
|
||||
t.geoid,
|
||||
%s,
|
||||
%s,
|
||||
count(*)::integer,
|
||||
sum(st_length(st_intersection(t.geom, s.geom)::geography)),
|
||||
null
|
||||
FROM {TRACT_TABLE} t
|
||||
JOIN public.{table_name} s
|
||||
ON t.geom && s.geom
|
||||
AND st_intersects(t.geom, s.geom)
|
||||
GROUP BY t.geoid
|
||||
""",
|
||||
(table_name, category),
|
||||
)
|
||||
elif "polygon" in gclass:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO {LINK_TABLE} (
|
||||
geoid,
|
||||
source_table,
|
||||
category,
|
||||
feature_count,
|
||||
intersect_length_m,
|
||||
intersect_area_sqm
|
||||
)
|
||||
SELECT
|
||||
t.geoid,
|
||||
%s,
|
||||
%s,
|
||||
count(*)::integer,
|
||||
null,
|
||||
sum(st_area(st_intersection(t.geom, s.geom)::geography))
|
||||
FROM {TRACT_TABLE} t
|
||||
JOIN public.{table_name} s
|
||||
ON t.geom && s.geom
|
||||
AND st_intersects(t.geom, s.geom)
|
||||
GROUP BY t.geoid
|
||||
""",
|
||||
(table_name, category),
|
||||
)
|
||||
|
||||
# State-level GEOID fallback: link rows without geometry using stateid → state FIPS prefix
|
||||
if stateid_count > 0:
|
||||
link_table_by_state(conn, table_name, category)
|
||||
|
||||
|
||||
def link_table_by_state(conn, table_name: str, category: str):
|
||||
"""Link EIA records to census tracts via state FIPS code prefix on GEOID.
|
||||
|
||||
EIA data has stateid (2-letter abbreviation). We map to FIPS and match
|
||||
all census tracts whose GEOID starts with that state FIPS code.
|
||||
Each state gets one link row with the count of records for that state.
|
||||
"""
|
||||
# Build a VALUES list for the state FIPS mapping
|
||||
fips_values = ", ".join(f"('{abbr}', '{fips}')" for abbr, fips in STATE_FIPS.items())
|
||||
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO {LINK_TABLE} (
|
||||
geoid,
|
||||
source_table,
|
||||
category,
|
||||
feature_count,
|
||||
intersect_length_m,
|
||||
intersect_area_sqm
|
||||
)
|
||||
SELECT
|
||||
t.geoid,
|
||||
%s,
|
||||
%s,
|
||||
state_counts.record_count::integer,
|
||||
null,
|
||||
null
|
||||
FROM (
|
||||
SELECT
|
||||
sf.fips AS state_fips,
|
||||
count(*) AS record_count
|
||||
FROM public.{table_name} s
|
||||
JOIN (VALUES {fips_values}) AS sf(abbr, fips)
|
||||
ON upper(s.properties->>'stateid') = sf.abbr
|
||||
WHERE s.geom IS NULL
|
||||
GROUP BY sf.fips
|
||||
) state_counts
|
||||
JOIN {TRACT_TABLE} t
|
||||
ON left(t.geoid, 2) = state_counts.state_fips
|
||||
ON CONFLICT DO NOTHING
|
||||
""",
|
||||
(table_name, category),
|
||||
)
|
||||
rows = cur.rowcount
|
||||
|
||||
if rows > 0:
|
||||
print(f" state-level geocoding linked {rows} tract rows for {table_name}")
|
||||
|
||||
|
||||
def build_summary_table(conn):
|
||||
"""Create GEOID summary aggregating all categories."""
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f"drop table if exists {SUMMARY_TABLE}")
|
||||
cur.execute(
|
||||
f"""
|
||||
create table {SUMMARY_TABLE} as
|
||||
select
|
||||
geoid,
|
||||
sum(feature_count) filter (where category = 'electric_grid')::integer
|
||||
as electric_grid_feature_count,
|
||||
sum(coalesce(intersect_length_m, 0)) filter (where category = 'electric_grid')
|
||||
as electric_grid_length_m,
|
||||
sum(coalesce(intersect_area_sqm, 0)) filter (where category = 'electric_grid')
|
||||
as electric_grid_area_sqm,
|
||||
sum(feature_count) filter (where category = 'power_plants')::integer
|
||||
as power_plant_feature_count,
|
||||
sum(feature_count) filter (where category = 'gas_infrastructure')::integer
|
||||
as gas_infrastructure_feature_count,
|
||||
sum(coalesce(intersect_length_m, 0)) filter (where category = 'gas_infrastructure')
|
||||
as gas_infrastructure_length_m,
|
||||
sum(coalesce(intersect_area_sqm, 0)) filter (where category = 'gas_infrastructure')
|
||||
as gas_infrastructure_area_sqm
|
||||
from {LINK_TABLE}
|
||||
group by geoid
|
||||
"""
|
||||
)
|
||||
cur.execute(f"alter table {SUMMARY_TABLE} add primary key (geoid)")
|
||||
cur.execute(
|
||||
f"create index energy_atlas_tract_link_geoid_idx on {LINK_TABLE} (geoid)"
|
||||
)
|
||||
cur.execute(
|
||||
f"create index energy_atlas_tract_link_category_idx on {LINK_TABLE} (category)"
|
||||
)
|
||||
cur.execute(f"analyze {LINK_TABLE}")
|
||||
cur.execute(f"analyze {SUMMARY_TABLE}")
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Ingest EIA energy infrastructure data via EIA API into PostGIS with GEOID linking."
|
||||
)
|
||||
)
|
||||
parser.add_argument(
|
||||
"--category",
|
||||
choices=["electric", "power", "gas", "all"],
|
||||
default="all",
|
||||
help="Infrastructure category to ingest.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-records",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Cap on API records to process per dataset (0=all).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-ingest",
|
||||
action="store_true",
|
||||
help="Skip import; rebuild GEOID links/summary only.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--list-only",
|
||||
action="store_true",
|
||||
help="List selected datasets and exit.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main ingestion pipeline."""
|
||||
args = parse_args()
|
||||
|
||||
if not EIA_API_KEY:
|
||||
print("ERROR: EIA_API_KEY not set. Export it and try again.")
|
||||
sys.exit(1)
|
||||
|
||||
# Discover EIA datasets by category
|
||||
datasets = get_eia_datasets(args.category)
|
||||
|
||||
if not datasets:
|
||||
raise RuntimeError(f"No datasets found for category '{args.category}'")
|
||||
|
||||
if args.max_records > 0:
|
||||
print(f"limiting to {args.max_records} records per dataset")
|
||||
|
||||
# Build ingest list with table names
|
||||
datasets_to_ingest = []
|
||||
for dataset in datasets:
|
||||
table_name = standardize_table_name(dataset.dataset_id)
|
||||
datasets_to_ingest.append((dataset, table_name, dataset.category))
|
||||
|
||||
if args.list_only:
|
||||
print(f"selected_datasets={len(datasets_to_ingest)}")
|
||||
for dataset, table_name, category in datasets_to_ingest:
|
||||
print(f"{category}\tpublic.{table_name}\t{dataset.name}\t{dataset.source_url}")
|
||||
return
|
||||
|
||||
if not datasets_to_ingest:
|
||||
raise RuntimeError("No datasets selected. Try --category all or --list-only.")
|
||||
|
||||
conn = connect_db()
|
||||
try:
|
||||
ensure_source_catalog_table(conn)
|
||||
|
||||
if not args.skip_ingest:
|
||||
for dataset, table_name, category in datasets_to_ingest:
|
||||
try:
|
||||
print(f"importing {dataset.name} -> public.{table_name} [{category}]")
|
||||
success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records)
|
||||
if success:
|
||||
upsert_layer_catalog(conn, table_name, dataset, category)
|
||||
add_geom_index_and_analyze(conn, table_name)
|
||||
except Exception as e:
|
||||
print(f" warning: import failed ({type(e).__name__}); skipping")
|
||||
continue
|
||||
|
||||
# Rebuild GEOID links from catalog.
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select table_name, category
|
||||
from public.energy_atlas_layers_catalog
|
||||
order by table_name
|
||||
"""
|
||||
)
|
||||
catalog_rows = cur.fetchall()
|
||||
|
||||
reset_link_tables(conn)
|
||||
for table_name, category in catalog_rows:
|
||||
print(f"linking public.{table_name} -> GEOID ({category})")
|
||||
link_one_table(conn, table_name, category)
|
||||
|
||||
build_summary_table(conn)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select count(*) from public.energy_atlas_layers_catalog")
|
||||
catalog_count = cur.fetchone()[0]
|
||||
cur.execute(f"select count(*) from {LINK_TABLE}")
|
||||
link_count = cur.fetchone()[0]
|
||||
cur.execute(f"select count(*) from {SUMMARY_TABLE}")
|
||||
summary_count = cur.fetchone()[0]
|
||||
|
||||
print(
|
||||
f"\ndone: catalog_layers={catalog_count}, "
|
||||
f"geoid_links={link_count}, geoid_summary_rows={summary_count}"
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user