diff --git a/__pycache__/ingest_eia_energy_layers.cpython-314.pyc b/__pycache__/ingest_eia_energy_layers.cpython-314.pyc index 0756fad..fdbcaa1 100644 Binary files a/__pycache__/ingest_eia_energy_layers.cpython-314.pyc and b/__pycache__/ingest_eia_energy_layers.cpython-314.pyc differ diff --git a/ingest_eia_energy_layers.py b/ingest_eia_energy_layers.py index c82a6f0..0ff5b62 100644 --- a/ingest_eia_energy_layers.py +++ b/ingest_eia_energy_layers.py @@ -23,6 +23,7 @@ import json import os import re import sys +import time from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Dict, Any @@ -30,6 +31,7 @@ from typing import List, Optional, Dict, Any import psycopg2 import requests from psycopg2 import sql +from psycopg2.extras import execute_values DB_NAME = "data_centers" @@ -51,39 +53,28 @@ SAFE_RE = re.compile(r"[^a-z0-9_]+") # EIA dataset categories mapped to infrastructure types EIA_DATASETS = { - "electric": { - "category": "electric_grid", - "endpoints": [ - "electricity/electric-power-operational-data", - "electricity/rto/region-data", - ], - }, "power": { "category": "power_plants", "endpoints": [ + # Plant-level generation assets with coordinates: what + where. "electricity/operating-generator-capacity", + # Per-plant monthly net + gross generation (Form EIA-923): how much. "electricity/facility-fuel", ], }, - "gas": { - "category": "gas_infrastructure", - "endpoints": [ - "natural-gas/move/ist", - "natural-gas/stor/sum", - "petroleum/stoc", - ], - }, } -# EIA endpoints that support lat/lon data fields for plant-level geocoding -EIA_GEOCODABLE_ENDPOINTS = { +# Extra data fields (the EIA `data[N]=` query params) each endpoint needs. +# operating-generator-capacity returns only id columns by default; latitude/longitude +# must be requested explicitly. facility-fuel returns only id columns; generation +# values must be requested explicitly. +EIA_DATASET_DATA_FIELDS = { "electricity/operating-generator-capacity": ["latitude", "longitude"], - "electricity/facility-fuel": ["latitude", "longitude"], + "electricity/facility-fuel": ["generation", "gross-generation"], } # Endpoints that do not reliably support retry with ad-hoc data[] field requests. EIA_NO_RETRY_EXTRA_FIELDS = { - "electricity/facility-fuel", } # US state abbreviation to FIPS code mapping for state-level GEOID linking @@ -127,92 +118,207 @@ def standardize_table_name(dataset_id: str) -> str: return f"{base[:46]}_{digest}" -def query_eia_api(endpoint: str, params: Optional[Dict[str, Any]] = None, extra_data_fields: Optional[List[str]] = None) -> Optional[Dict]: - """Query EIA API endpoint.""" - # EIA API uses /data/ suffix for data queries +class EIAClientError(Exception): + """Non-retryable EIA API error (e.g. 400 for unsupported fields).""" + + +def iter_months(start: str, end: str): + """Yield 'YYYY-MM' strings from start to end inclusive.""" + sy, sm = (int(x) for x in start.split("-")) + ey, em = (int(x) for x in end.split("-")) + y, m = sy, sm + while (y, m) <= (ey, em): + yield f"{y:04d}-{m:02d}" + m += 1 + if m > 12: + m = 1 + y += 1 + + +def discover_period_range(endpoint: str) -> tuple: + """Return (earliest, latest) 'YYYY-MM' period strings for an endpoint. + + Forces frequency=monthly so endpoints that also publish annual/quarterly + series (e.g. facility-fuel) don't return non-monthly period formats that + break iter_months. Routes through query_eia_api for retry/backoff coverage. + """ + def _one(direction: str) -> str: + data = query_eia_api( + endpoint, + params={ + "length": 1, + "sort[0][column]": "period", + "sort[0][direction]": direction, + }, + query_params={"frequency": "monthly"}, + ) + rows = (data or {}).get("response", {}).get("data", []) + if not rows: + raise RuntimeError(f"no rows returned discovering period range for {endpoint}") + return rows[0]["period"] + + return _one("asc"), _one("desc") + + +def query_eia_api( + endpoint: str, + params: Optional[Dict[str, Any]] = None, + extra_data_fields: Optional[List[str]] = None, + query_params: Optional[Dict[str, Any]] = None, +) -> Optional[Dict]: + """Query EIA API endpoint with retry/backoff on transient errors. + + Returns parsed JSON on success. Raises EIAClientError on 4xx (caller + decides whether to retry without extra fields). Raises requests.RequestException + after exhausting retries on transient errors. + """ if not endpoint.endswith("/data"): endpoint = f"{endpoint}/data" - + url = f"{EIA_API_BASE}/{endpoint}/" - req_params = {"api_key": EIA_API_KEY, "length": 5000} + req_params: Dict[str, Any] = {"api_key": EIA_API_KEY, "length": 5000} + if query_params: + req_params.update(query_params) if params: req_params.update(params) - - # Add extra data fields (e.g., latitude, longitude) using EIA's array syntax + if extra_data_fields: for i, field in enumerate(extra_data_fields): req_params[f"data[{i}]"] = field - - try: - resp = requests.get(url, params=req_params, timeout=(10, 20)) - resp.raise_for_status() - return resp.json() - except requests.RequestException as e: - print(f" api error on {endpoint}: {e}") - return None + + max_attempts = 10 + base_backoff = 5.0 + max_backoff = 120.0 + last_exc: Optional[Exception] = None + for attempt in range(1, max_attempts + 1): + try: + resp = requests.get(url, params=req_params, timeout=(10, 120)) + if 400 <= resp.status_code < 500 and resp.status_code != 429: + raise EIAClientError(f"HTTP {resp.status_code} on {endpoint}: {resp.text[:200]}") + resp.raise_for_status() + return resp.json() + except EIAClientError: + raise + except (requests.Timeout, requests.ConnectionError, requests.HTTPError, ValueError) as e: + last_exc = e + if attempt == max_attempts: + break + sleep_s = min(base_backoff * (2 ** (attempt - 1)), max_backoff) + print(f" api error on {endpoint} (attempt {attempt}/{max_attempts}): {e}; retrying in {sleep_s:.0f}s") + time.sleep(sleep_s) + raise last_exc # type: ignore[misc] -def fetch_eia_records( +def fetch_eia_pages( endpoint: str, max_records: int = 0, extra_data_fields: Optional[List[str]] = None, -) -> Optional[List[Dict[str, Any]]]: - """Fetch EIA records with pagination; retry without extra fields on unsupported endpoints.""" + query_params: Optional[Dict[str, Any]] = None, +) -> Any: + """Yield paged EIA records; retry without extra fields on unsupported endpoints.""" page_size = 5000 offset = 0 - records: List[Dict[str, Any]] = [] + yielded = 0 used_extra_fields = extra_data_fields previous_first_row: Optional[str] = None while True: params = {"offset": offset, "length": page_size} - data = query_eia_api(endpoint, params=params, extra_data_fields=used_extra_fields) - - # Some endpoints return 400 when requesting unsupported fields (e.g. lat/lon). - if ( - data is None - and used_extra_fields - and endpoint not in EIA_NO_RETRY_EXTRA_FIELDS - ): - print(f" retrying {endpoint} without extra data fields") - used_extra_fields = None - data = query_eia_api(endpoint, params=params, extra_data_fields=None) + try: + data = query_eia_api( + endpoint, + params=params, + extra_data_fields=used_extra_fields, + query_params=query_params, + ) + except EIAClientError as e: + if used_extra_fields and endpoint not in EIA_NO_RETRY_EXTRA_FIELDS: + print(f" retrying {endpoint} without extra data fields ({e})") + used_extra_fields = None + data = query_eia_api( + endpoint, + params=params, + extra_data_fields=None, + query_params=query_params, + ) + else: + raise if not data: - return None if offset == 0 else records + return response = data.get("response", {}) page_records = response.get("data", []) if not page_records: - return records + return # Some EIA endpoints ignore offset and repeat page 1 forever. # Detect repeated first row signature and stop pagination. first_row_sig = json.dumps(page_records[0], sort_keys=True, default=str) if previous_first_row is not None and first_row_sig == previous_first_row: - return records + return previous_first_row = first_row_sig - records.extend(page_records) - total = response.get("total") try: total_int = int(total) if total is not None else None except (TypeError, ValueError): total_int = None - if max_records > 0 and len(records) >= max_records: - return records[:max_records] + if max_records > 0: + remaining = max_records - yielded + if remaining <= 0: + return + if len(page_records) > remaining: + page_records = page_records[:remaining] - if total_int is not None and len(records) >= total_int: - return records + yield page_records, used_extra_fields + yielded += len(page_records) + + if max_records > 0 and yielded >= max_records: + return + + if total_int is not None and yielded >= total_int: + return if len(page_records) < page_size: - return records + return offset += len(page_records) +def fetch_eia_pages_by_month( + endpoint: str, + earliest: str, + latest: str, + max_records: int = 0, + extra_data_fields: Optional[List[str]] = None, +) -> Any: + """Yield pages across months, querying one month at a time. + + EIA's bulk endpoints serve large offsets slowly and return frequent 503s + under sustained load. Filtering by &frequency=monthly&start=X&end=X keeps + each query small (~17k–28k rows per month for operating-generator-capacity) + and dramatically reduces failure rate and wall time. + """ + yielded = 0 + for month in iter_months(earliest, latest): + if max_records > 0 and yielded >= max_records: + return + remaining = max_records - yielded if max_records > 0 else 0 + month_params = {"frequency": "monthly", "start": month, "end": month} + for page_records, used_extra_fields in fetch_eia_pages( + endpoint, + max_records=remaining, + extra_data_fields=extra_data_fields, + query_params=month_params, + ): + yield page_records, used_extra_fields, month + yielded += len(page_records) + if max_records > 0 and yielded >= max_records: + return + + def get_eia_datasets(category: str = "all") -> List[EIADataset]: """Discover EIA datasets by category.""" datasets = [] @@ -256,79 +362,91 @@ def import_layer_to_postgis(dataset: EIADataset, table_name: str, max_records: i """Import EIA dataset to PostGIS table.""" conn = connect_db() try: - # Check if this endpoint supports lat/lon geocoding - extra_fields = EIA_GEOCODABLE_ENDPOINTS.get(dataset.api_endpoint) + extra_fields = EIA_DATASET_DATA_FIELDS.get(dataset.api_endpoint) - # Query EIA API for data (with pagination), requesting lat/lon when supported. - records = fetch_eia_records( + earliest, latest = discover_period_range(dataset.api_endpoint) + print(f" period range: {earliest} -> {latest}") + + count = 0 + geo_count = 0 + initialized = False + current_month: Optional[str] = None + + for page_records, used_extra_fields, month in fetch_eia_pages_by_month( dataset.api_endpoint, + earliest=earliest, + latest=latest, max_records=max_records, extra_data_fields=extra_fields, - ) - if not records: - print(f" no data returned") + ): + if month != current_month: + if current_month is not None: + print(f" progress: {count} rows ingested through {current_month}") + current_month = month + if not initialized: + with conn: + with conn.cursor() as cur: + cur.execute( + f""" + CREATE TABLE IF NOT EXISTS public.{table_name} ( + gid SERIAL PRIMARY KEY, + geom GEOMETRY(GEOMETRY, 4326), + properties JSONB + ) + """ + ) + cur.execute(f"TRUNCATE TABLE public.{table_name}") + initialized = True + + geom_rows = [] + prop_rows = [] + for record in page_records: + props_json = json.dumps(record) + lat = record.get("latitude") or record.get("lat") + lon = record.get("longitude") or record.get("lon") + try: + lat = float(lat) if lat is not None else None + lon = float(lon) if lon is not None else None + except (TypeError, ValueError): + lat = lon = None + + if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180: + geom_rows.append((lon, lat, props_json)) + geo_count += 1 + else: + prop_rows.append((props_json,)) + count += 1 + + with conn: + with conn.cursor() as cur: + if geom_rows: + execute_values( + cur, + f"INSERT INTO public.{table_name} (geom, properties) VALUES %s", + geom_rows, + template="(ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)", + page_size=1000, + ) + if prop_rows: + execute_values( + cur, + f"INSERT INTO public.{table_name} (properties) VALUES %s", + prop_rows, + template="(%s)", + page_size=1000, + ) + + # Track if API ended up running without extra fields after retry. + if used_extra_fields is None: + extra_fields = None + + if not initialized: + print(" no data returned") return False - # Create target table only once data is confirmed. - with conn: - with conn.cursor() as cur: - cur.execute( - f""" - CREATE TABLE IF NOT EXISTS public.{table_name} ( - gid SERIAL PRIMARY KEY, - geom GEOMETRY(GEOMETRY, 4326), - properties JSONB - ) - """ - ) - # Truncate to avoid duplicates on re-runs - cur.execute(f"TRUNCATE TABLE public.{table_name}") - - # Insert records into PostGIS using psycopg2. - with conn: - with conn.cursor() as cur: - count = 0 - geo_count = 0 - for record in records: - try: - props_json = json.dumps(record) - # Try to extract lat/lon for geometry - lat = record.get("latitude") or record.get("lat") - lon = record.get("longitude") or record.get("lon") - try: - lat = float(lat) if lat is not None else None - lon = float(lon) if lon is not None else None - except (TypeError, ValueError): - lat = lon = None - - if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180: - cur.execute( - f""" - INSERT INTO public.{table_name} (geom, properties) - VALUES (ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s) - """, - (lon, lat, props_json), - ) - geo_count += 1 - else: - cur.execute( - f""" - INSERT INTO public.{table_name} (properties) - VALUES (%s) - """, - (props_json,), - ) - count += 1 - except Exception as e: - print(f" row insert error: {e}") - continue - geo_msg = f", {geo_count} with geometry" if extra_fields else "" print(f" inserted {count} features into {table_name}{geo_msg}") return count > 0 - except Exception as e: - print(f" error: {e}") - return False finally: try: conn.close() @@ -652,6 +770,153 @@ def build_summary_table(conn): cur.execute(f"analyze {SUMMARY_TABLE}") +def build_flat_tables(conn): + """Create analyst-friendly flat tables from JSON properties.""" + with conn.cursor() as cur: + cur.execute( + """ + select table_name + from information_schema.tables + where table_schema='public' + and table_name in ( + 'energy_eia_electricity_operating_generator_capacity', + 'energy_eia_electricity_facility_fuel' + ) + """ + ) + available = {row[0] for row in cur.fetchall()} + + with conn: + with conn.cursor() as cur: + cur.execute("drop table if exists public.energy_eia_electric_power_operational_data_flat") + cur.execute("drop table if exists public.energy_eia_rto_region_data_flat") + + if "energy_eia_electricity_operating_generator_capacity" in available: + cur.execute("drop table if exists public.energy_eia_operating_generator_capacity_flat") + # EIA stored lower-48 longitudes as positive numbers for periods + # 2008-01 through 2010-11 (~600k rows). The negative sign is + # restored here for any state other than AK (Alaska legitimately + # has Aleutian plants east of the dateline with positive lons). + # geom is rebuilt from the corrected coordinates so the source + # table's pre-correction geometry is discarded. + cur.execute( + r""" + create table public.energy_eia_operating_generator_capacity_flat as + with parsed as ( + select + gid, + properties, + properties->>'stateid' as state_id_raw, + case + when (properties->>'latitude') ~ '^-?[0-9]+(\.[0-9]+)?$' + then (properties->>'latitude')::double precision + end as latitude_raw, + case + when (properties->>'longitude') ~ '^-?[0-9]+(\.[0-9]+)?$' + then (properties->>'longitude')::double precision + end as longitude_raw + from public.energy_eia_electricity_operating_generator_capacity + ), + fixed as ( + select + *, + case + when longitude_raw > 0 and state_id_raw <> 'AK' + then -longitude_raw + else longitude_raw + end as longitude_fixed + from parsed + ) + select + gid, + case + when latitude_raw is not null + and longitude_fixed is not null + and latitude_raw between -90 and 90 + and longitude_fixed between -180 and 180 + then st_setsrid(st_makepoint(longitude_fixed, latitude_raw), 4326) + end as geom, + properties->>'period' as period, + properties->>'plantid' as plant_id, + properties->>'plantName' as plant_name, + state_id_raw as state_id, + properties->>'stateName' as state_name, + properties->>'entityid' as entity_id, + properties->>'entityName' as entity_name, + properties->>'generatorid' as generator_id, + properties->>'status' as status, + properties->>'sector' as sector, + properties->>'sectorName' as sector_name, + properties->>'energy_source_code' as energy_source_code, + properties->>'energy-source-desc' as energy_source_desc, + properties->>'prime_mover_code' as prime_mover_code, + properties->>'balancing_authority_code' as balancing_authority_code, + properties->>'balancing-authority-name' as balancing_authority_name, + latitude_raw as latitude, + longitude_fixed as longitude, + properties as raw_properties + from fixed + """ + ) + cur.execute( + "create index energy_eia_operating_generator_capacity_flat_geom_gix " + "on public.energy_eia_operating_generator_capacity_flat using gist (geom)" + ) + cur.execute( + "create index energy_eia_operating_generator_capacity_flat_plant_id_idx " + "on public.energy_eia_operating_generator_capacity_flat (plant_id)" + ) + cur.execute( + "create index energy_eia_operating_generator_capacity_flat_state_id_idx " + "on public.energy_eia_operating_generator_capacity_flat (state_id)" + ) + cur.execute("analyze public.energy_eia_operating_generator_capacity_flat") + + if "energy_eia_electricity_facility_fuel" in available: + cur.execute("drop table if exists public.energy_eia_facility_fuel_flat") + cur.execute( + r""" + create table public.energy_eia_facility_fuel_flat as + select + gid, + properties->>'period' as period, + coalesce(properties->>'plantCode', properties->>'plantid') as plant_id, + properties->>'plantName' as plant_name, + properties->>'state' as state_id, + properties->>'stateDescription' as state_name, + properties->>'primeMover' as prime_mover_code, + properties->>'primeMoverDescription' as prime_mover_desc, + properties->>'fuel2002' as energy_source_code, + properties->>'fuel2002Description' as energy_source_desc, + case + when (properties->>'generation') ~ '^-?[0-9]+(\.[0-9]+)?$' + then (properties->>'generation')::double precision + else null + end as generation_mwh, + case + when (properties->>'gross-generation') ~ '^-?[0-9]+(\.[0-9]+)?$' + then (properties->>'gross-generation')::double precision + else null + end as gross_generation_mwh, + properties as raw_properties + from public.energy_eia_electricity_facility_fuel + """ + ) + cur.execute( + "create index energy_eia_facility_fuel_flat_plant_id_idx " + "on public.energy_eia_facility_fuel_flat (plant_id)" + ) + cur.execute( + "create index energy_eia_facility_fuel_flat_period_idx " + "on public.energy_eia_facility_fuel_flat (period)" + ) + cur.execute( + "create index energy_eia_facility_fuel_flat_state_id_idx " + "on public.energy_eia_facility_fuel_flat (state_id)" + ) + cur.execute("analyze public.energy_eia_facility_fuel_flat") + + def prune_stale_layer_versions(conn) -> int: """Drop superseded EIA layer tables and remove stale catalog rows. @@ -718,6 +983,90 @@ def prune_stale_layer_versions(conn) -> int: return pruned +def prune_unselected_layers(conn, selected_table_names: List[str]) -> int: + """Drop catalog/table entries that are not in the currently selected dataset set.""" + selected = set(selected_table_names) + with conn.cursor() as cur: + cur.execute( + """ + select table_name + from public.energy_atlas_layers_catalog + where table_name like 'energy_eia_%' + """ + ) + existing = [row[0] for row in cur.fetchall()] + + to_remove = [name for name in existing if name not in selected] + removed = 0 + + with conn: + with conn.cursor() as cur: + for table_name in to_remove: + cur.execute( + """ + select exists ( + select 1 + from information_schema.tables + where table_schema='public' and table_name=%s + ) + """, + (table_name,), + ) + table_exists = cur.fetchone()[0] + + if table_exists: + cur.execute( + sql.SQL("drop table if exists public.{} cascade").format( + sql.Identifier(table_name) + ) + ) + print(f"pruned unselected table public.{table_name}") + + cur.execute( + "delete from public.energy_atlas_layers_catalog where table_name = %s", + (table_name,), + ) + removed += 1 + + return removed + + +FINAL_FLAT_TABLES = ( + "energy_eia_operating_generator_capacity_flat", + "energy_eia_facility_fuel_flat", +) + + +def keep_only_target_flat_table(conn) -> int: + """Drop all energy_eia_* tables except the final flat tables.""" + with conn.cursor() as cur: + cur.execute( + """ + select table_name + from information_schema.tables + where table_schema='public' + and table_name like 'energy_eia_%%' + and table_name <> ALL(%s) + """, + (list(FINAL_FLAT_TABLES),), + ) + to_drop = [row[0] for row in cur.fetchall()] + + dropped = 0 + with conn: + with conn.cursor() as cur: + for table_name in to_drop: + cur.execute( + sql.SQL("drop table if exists public.{} cascade").format( + sql.Identifier(table_name) + ) + ) + print(f"dropped non-target table public.{table_name}") + dropped += 1 + + return dropped + + def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( @@ -727,8 +1076,8 @@ def parse_args(): ) parser.add_argument( "--category", - choices=["electric", "power", "gas", "all"], - default="all", + choices=["power", "all"], + default="power", help="Infrastructure category to ingest.", ) parser.add_argument( @@ -737,6 +1086,16 @@ def parse_args(): default=0, help="Cap on API records to process per dataset (0=all).", ) + parser.add_argument( + "--endpoint", + action="append", + default=None, + help=( + "Limit ingest to specific EIA endpoint(s). " + "Repeatable. Substring match against api_endpoint. " + "Other datasets are skipped (not re-ingested, not pruned)." + ), + ) parser.add_argument( "--skip-ingest", action="store_true", @@ -775,6 +1134,8 @@ def main(): # Build ingest list with table names datasets_to_ingest = [] for dataset in datasets: + if args.endpoint and not any(filt in dataset.api_endpoint for filt in args.endpoint): + continue table_name = standardize_table_name(dataset.dataset_id) datasets_to_ingest.append((dataset, table_name, dataset.category)) @@ -793,69 +1154,49 @@ def main(): if not args.skip_ingest: for dataset, table_name, category in datasets_to_ingest: - try: - print(f"importing {dataset.name} -> public.{table_name} [{category}]") - success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records) - if success: - upsert_layer_catalog(conn, table_name, dataset, category) - add_geom_index_and_analyze(conn, table_name) - except Exception as e: - print(f" warning: import failed ({type(e).__name__}); skipping") - continue + print(f"importing {dataset.name} -> public.{table_name} [{category}]") + success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records) + if success: + upsert_layer_catalog(conn, table_name, dataset, category) + add_geom_index_and_analyze(conn, table_name) - if not args.keep_stale_tables: - pruned = prune_stale_layer_versions(conn) - if pruned > 0: - print(f"pruned stale layer versions: {pruned}") - - # Rebuild GEOID links from catalog. - with conn.cursor() as cur: - cur.execute( - """ - with ranked as ( - select - c.table_name, - c.category, - row_number() over ( - partition by coalesce( - nullif(regexp_replace(c.source_url, '/data/?$', ''), ''), - nullif(c.source_item_id, ''), - c.table_name - ) - order by c.imported_at desc, c.table_name desc - ) as rn - from public.energy_atlas_layers_catalog c - join information_schema.tables t - on t.table_schema = 'public' - and t.table_name = c.table_name - ) - select table_name, category - from ranked - where rn = 1 - order by table_name - """ - ) - catalog_rows = cur.fetchall() + # Pruning compares against the *full* selected set; skip when --endpoint + # is narrowing the run, otherwise we'd drop catalog entries for endpoints + # we deliberately chose not to touch. + if not args.endpoint: + selected_table_names = [table_name for _, table_name, _ in datasets_to_ingest] + removed = prune_unselected_layers(conn, selected_table_names) + if removed > 0: + print(f"pruned unselected layers: {removed}") - reset_link_tables(conn) - for table_name, category in catalog_rows: - print(f"linking public.{table_name} -> GEOID ({category})") - link_one_table(conn, table_name, category) + if not args.keep_stale_tables: + pruned = prune_stale_layer_versions(conn) + if pruned > 0: + print(f"pruned stale layer versions: {pruned}") - build_summary_table(conn) + build_flat_tables(conn) + dropped_non_target = keep_only_target_flat_table(conn) + if dropped_non_target > 0: + print(f"dropped non-target energy tables: {dropped_non_target}") with conn.cursor() as cur: cur.execute("select count(*) from public.energy_atlas_layers_catalog") catalog_count = cur.fetchone()[0] - cur.execute(f"select count(*) from {LINK_TABLE}") - link_count = cur.fetchone()[0] - cur.execute(f"select count(*) from {SUMMARY_TABLE}") - summary_count = cur.fetchone()[0] + counts = {} + for tbl in FINAL_FLAT_TABLES: + cur.execute( + "select to_regclass(%s) is not null", + (f"public.{tbl}",), + ) + if cur.fetchone()[0]: + cur.execute(f"select count(*) from public.{tbl}") + counts[tbl] = cur.fetchone()[0] + else: + counts[tbl] = None - print( - f"\ndone: catalog_layers={catalog_count}, " - f"geoid_links={link_count}, geoid_summary_rows={summary_count}" - ) + print(f"\ndone: catalog_layers={catalog_count}") + for tbl, n in counts.items(): + print(f" {tbl}: {n if n is not None else 'missing'} rows") finally: conn.close() diff --git a/output/facility_fuel_pending_narrative.txt b/output/facility_fuel_pending_narrative.txt new file mode 100644 index 0000000..254942d --- /dev/null +++ b/output/facility_fuel_pending_narrative.txt @@ -0,0 +1,132 @@ +================================================================================ +EIA Facility-Fuel — Pending Dataset Narrative +Drafted 2026-05-16, prior to first successful ingest +================================================================================ + +STATUS +------ +Wired into the weekly ingest pipeline as of 2026-05-16, but not yet +populated. EIA's facility-fuel endpoint and its parent EIA-923 service +were experiencing a sustained outage at write time (network-level +connection timeouts, also visible on EIA's public dashboard). The +endpoint is queued for the next successful systemd run (Monday 03:30, +or sooner if EIA recovers). + +Target table when populated: public.energy_eia_facility_fuel_flat + +WHAT THIS DATA IS +----------------- +The "facility-fuel" endpoint +(https://api.eia.gov/v2/electricity/facility-fuel/) exposes Form EIA-923: +the monthly survey collected from electric power plants reporting their +fuel consumption and electricity output. Where operating-generator-capacity +tells us WHAT generators exist and WHERE they are, facility-fuel tells us +HOW MUCH electricity each plant actually produced each month. + +Each row represents one (plant × energy source × prime mover × month) +combination. A coal-gas hybrid plant with both steam turbines and +combustion turbines, for example, would have multiple rows per month — +one for each fuel/prime-mover combination it ran during that month. + +WHAT IT TELLS US (PLANNED COLUMNS) +---------------------------------- +For each plant, in each reporting month: + + period YYYY-MM reporting month + plant_id EIA plant code — joins to operating_generator_capacity_flat + plant_name Plant name (when present) + state_id Two-letter state + state_name Full state name (when present) + prime_mover_code ST=steam, CT=combustion, HY=hydro, etc. + prime_mover_desc Human-readable prime mover + energy_source_code EIA fuel code (e.g., NG=natural gas, BIT=bituminous coal) + energy_source_desc Human-readable fuel + generation_mwh NET generation in megawatt-hours (after plant use) + gross_generation_mwh GROSS generation in megawatt-hours (at the busbar) + raw_properties Full JSONB of the EIA response row (safety net) + +The two MWh fields are the headline numbers — actual electricity output. + +WHY BOTH TABLES MATTER +---------------------- +The capacity table answers "what generators exist and where," but a +generator that exists is not the same as a generator that produces. A +1,000 MW coal plant in standby status produces zero MWh; a 100 MW solar +farm at noon produces near its nameplate. Capacity sets the upper bound; +facility-fuel reports the realized output. + +For data-center analyses specifically, this matters because: + + - Siting decisions correlate with available local generation. The + capacity table shows nearby supply potential. The facility-fuel + table shows whether that potential is actually being realized + month-to-month (e.g., a nearby gas plant that runs only as peaker + is a very different story from one running baseload). + + - Carbon intensity per data center can be estimated by attributing + nearby generation MWh to fuel type, weighted by distance or + balancing-authority membership. + + - Grid stress signals (capacity utilization = generation / capacity) + flag regions where new data-center load may be unwelcome. + +JOIN PATTERN +------------ +The natural join key is plant_id (text). Typical analyst query: + + select + cap.plant_name, + cap.state_id, + cap.entity_name, + cap.latitude, + cap.longitude, + ff.period, + ff.energy_source_desc, + ff.generation_mwh, + ff.gross_generation_mwh + from public.energy_eia_facility_fuel_flat ff + join public.energy_eia_operating_generator_capacity_flat cap + on cap.plant_id = ff.plant_id + and cap.period = ff.period + where ff.period = '2026-01'; + +Note: capacity rows are per-generator; facility-fuel rows are per +plant × fuel × prime mover. A join on plant_id alone will multiply rows. +For most aggregate questions, aggregate one side first (e.g., sum MWh +per plant-month, or pick a representative generator per plant). + +EXPECTED SIZE +------------- +Form EIA-923 monthly publishes back to 2001-01. With ~10,000 reporting +plants and multiple fuel/prime-mover combinations per plant per month, +the table is expected in the 5–10 million row range — similar to or +somewhat larger than the capacity table. The per-month ingest strategy +(start=YYYY-MM&end=YYYY-MM, retry/backoff) is identical to the capacity +ingest and was chosen specifically because it kept that table's wall +time near two hours and recovered cleanly from EIA's transient 503s. + +UNKNOWNS AT TIME OF DRAFT +------------------------- +The flat-table SELECT was written from EIA's API documentation without +confirmation of the exact JSON key casing returned by the live endpoint +(the documentation lists facets as plantCode, fuel2002, primeMover, state +— the SELECT uses these names). If the live response differs (e.g., +plantid vs plantCode), the typed columns will populate as NULL for +those rows, and the full original payload will still be available in +raw_properties for inspection. The fix in that case is a one-line edit +to the SELECT in build_flat_tables() in ingest_eia_energy_layers.py. + +OPERATIONAL NOTES +----------------- + - Runs in the same weekly systemd job as operating-generator-capacity, + sequentially after it (Monday 03:30 via + ingest-eia-energy-layers.timer). + + - Both tables are rebuilt from scratch each run (TRUNCATE on first + page), so historical revisions EIA pushes upstream propagate + automatically. There is no incremental-load mode and none is + planned — total wall time is acceptable. + + - If EIA-923 is down at run time, the wrapper's `set -e` will mark + the systemd service as failed; the capacity ingest will still have + completed successfully because it runs first. diff --git a/output/operating_generator_capacity_sample.txt b/output/operating_generator_capacity_sample.txt new file mode 100644 index 0000000..4163e49 --- /dev/null +++ b/output/operating_generator_capacity_sample.txt @@ -0,0 +1,134 @@ +================================================================================ +EIA Operating Generator Capacity — Sample Rows + Narrative +Generated 2026-05-16 from public.energy_eia_operating_generator_capacity_flat +================================================================================ + +WHAT THIS DATA IS +----------------- +This table is a flat, queryable view of EIA's "operating-generator-capacity" +endpoint (https://api.eia.gov/v2/electricity/operating-generator-capacity/). +The underlying source is Form EIA-860, which inventories every electric +generator in the United States that is reported as operating (or recently +operating) by its owner. + +Each row represents one generator's reported status in one month. A single +power plant typically has multiple generators, so a plant like Plant Barry in +Alabama appears as several rows per month — one for each generator unit +(generator_id 1, 2, 3, ...). The same generator reappears every month it +remains in the inventory, so the table is a time series of (plant × generator +× month) records. + +WHAT IT TELLS US +---------------- +For each generator, in each reporting month: + - Where it is (state, balancing authority, exact latitude/longitude) + - Who owns or operates it (entity_id, entity_name) + - What fuel/energy source it uses (energy_source_code + descriptive name) + - How it generates electricity (prime_mover_code, e.g. ST=steam turbine, + HY=hydro, IC=internal combustion, WT=wind turbine) + - Its current operating status (status code, see below) + - What sector it serves (utility, IPP, industrial, commercial, etc.) + +What it does NOT tell us is how much electricity the generator actually +produces in that month — that data comes from a separate EIA endpoint +("facility-fuel", Form EIA-923), captured in a sibling table. + +STATUS CODES IN THIS TABLE +-------------------------- + OP Operating 4,229,083 rows + SB Standby / backup 339,057 rows + OS Out of service 99,816 rows + OA Out of service (annual) 28,769 rows + +SUMMARY STATISTICS +------------------ + Total rows: 4,696,725 + Distinct generators (by plant_id × generator_id): ~75k + Distinct plants (plant_id): 15,791 + Distinct states/territories: 51 + Distinct months covered: 218 + Period range: 2008-01 → 2026-02 + Rows with lat/lon geometry: 4,685,500 (99.76%) + Distinct fuel codes: 38 + +TOP 10 FUELS BY ROW COUNT +------------------------- + Natural Gas 1,301,782 + Water (hydro) 908,741 + Distillate Fuel Oil* 767,207 + Solar 624,113 + Landfill Gas 317,709 + Wind 245,214 + Bituminous Coal 108,352 + Subbituminous Coal 75,587 + Electricity used for energy storage 43,833 + Geothermal 41,066 + + * EIA stores this as "Disillate Fuel Oil" (sic). The misspelling is in + EIA's source data, not introduced by ingest. Preserved verbatim. + +FIRST 5 ROWS (earliest period, ordered by plant_id) +--------------------------------------------------- + period | plant_id | plant_name | state | entity_name | gen_id | status | fuel | pm | latitude | longitude +---------+----------+--------------+-------+------------------+--------+--------+------------------+----+-----------+----------- + 2008-01 | 2 | Bankhead Dam | AL | Alabama Power Co | 1 | OP | Water | HY | 33.218889 | -87.579722 + 2008-01 | 3 | Barry | AL | Alabama Power Co | 1 | OP | Bituminous Coal | ST | 31.004167 | -88.013889 + 2008-01 | 3 | Barry | AL | Alabama Power Co | 2 | OP | Bituminous Coal | ST | 31.004167 | -88.013889 + 2008-01 | 3 | Barry | AL | Alabama Power Co | 3 | OP | Bituminous Coal | ST | 31.004167 | -88.013889 + 2008-01 | 3 | Barry | AL | Alabama Power Co | 4 | OP | Bituminous Coal | ST | 31.004167 | -88.013889 + +(Both plants are in Alabama; Bankhead Dam is a hydro facility on the Black +Warrior River, Plant Barry is a coal-fired steam plant near Mobile. Both +were operating in January 2008.) + +LAST 5 ROWS (latest period, ordered by plant_id) +------------------------------------------------ + period | plant_id | plant_name | state | entity_name | gen_id | status | fuel | pm | latitude | longitude +---------+----------+------------+-------+----------------------------+--------+--------+---------------------+----+-----------+------------- + 2026-02 | 1 | Sand Point | AK | Sand Point Generating, LLC | 1 | SB | Disillate Fuel Oil | IC | 55.339722 | -160.497222 + 2026-02 | 1 | Sand Point | AK | Sand Point Generating, LLC | 2 | OP | Disillate Fuel Oil | IC | 55.339722 | -160.497222 + 2026-02 | 1 | Sand Point | AK | Sand Point Generating, LLC | 3 | OP | Disillate Fuel Oil | IC | 55.339722 | -160.497222 + 2026-02 | 1 | Sand Point | AK | Sand Point Generating, LLC | 5.1 | OP | Disillate Fuel Oil | IC | 55.339722 | -160.497222 + 2026-02 | 1 | Sand Point | AK | Sand Point Generating, LLC | WT1 | OS | Wind | WT | 55.339722 | -160.497222 + +(Sand Point is a small remote-Alaska community station with five generators: +four diesel internal-combustion units and one wind turbine. The wind turbine +is currently out of service.) + +KNOWN DATA-QUALITY QUIRKS IN EIA'S SOURCE DATA +---------------------------------------------- + - Historical longitude sign bug (FIXED at ingest time, 2026-05-16). + For reporting periods 2008-01 through 2010-11, EIA stored lower-48 + longitudes as positive numbers (Bankhead Dam was +87.579722 instead + of -87.579722). EIA cleaned this up in their own data starting + 2010-12, but the historical periods still had the bug. The flat + table's build step now applies: + + CASE WHEN longitude > 0 AND state_id <> 'AK' + THEN -longitude ELSE longitude END + + and rebuilds geom from the corrected coordinates. Alaska is + excluded because some Aleutian plants (~11k bug-era rows) are + legitimately east of the dateline with positive longitudes. + Affected non-AK rows fixed: 403,558. After the fix, every plant + in the table is at a geographically plausible US location. + + - Fuel description "Disillate Fuel Oil" (missing 't', should be + "Distillate") — EIA's spelling, preserved as-is in energy_source_desc. + +REFRESH CADENCE +--------------- +A systemd user timer rebuilds this table every Monday at 03:30 local time +via ~/.local/bin/ingest-eia-energy-layers-weekly. The ingest fetches the +full dataset per month (Jan 2008 → current) and rebuilds the flat table +from scratch each run. + +JOIN KEY FOR DOWNSTREAM ANALYSIS +-------------------------------- +plant_id (text) joins to the forthcoming energy_eia_facility_fuel_flat +table (Form EIA-923), which provides monthly net + gross generation in MWh +for the same plants. Together, the two tables answer: + + - WHERE energy is generated (this table, with lat/lon) + - WHAT is generated and by whom (this table, with fuel + entity) + - HOW MUCH is generated each month (facility_fuel_flat, in MWh)