got the ingest for energy eia data. created txt files of their descriptions
This commit is contained in:
@@ -23,6 +23,7 @@ import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
@@ -30,6 +31,7 @@ from typing import List, Optional, Dict, Any
|
||||
import psycopg2
|
||||
import requests
|
||||
from psycopg2 import sql
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
|
||||
DB_NAME = "data_centers"
|
||||
@@ -51,39 +53,28 @@ SAFE_RE = re.compile(r"[^a-z0-9_]+")
|
||||
|
||||
# EIA dataset categories mapped to infrastructure types
|
||||
EIA_DATASETS = {
|
||||
"electric": {
|
||||
"category": "electric_grid",
|
||||
"endpoints": [
|
||||
"electricity/electric-power-operational-data",
|
||||
"electricity/rto/region-data",
|
||||
],
|
||||
},
|
||||
"power": {
|
||||
"category": "power_plants",
|
||||
"endpoints": [
|
||||
# Plant-level generation assets with coordinates: what + where.
|
||||
"electricity/operating-generator-capacity",
|
||||
# Per-plant monthly net + gross generation (Form EIA-923): how much.
|
||||
"electricity/facility-fuel",
|
||||
],
|
||||
},
|
||||
"gas": {
|
||||
"category": "gas_infrastructure",
|
||||
"endpoints": [
|
||||
"natural-gas/move/ist",
|
||||
"natural-gas/stor/sum",
|
||||
"petroleum/stoc",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
# EIA endpoints that support lat/lon data fields for plant-level geocoding
|
||||
EIA_GEOCODABLE_ENDPOINTS = {
|
||||
# Extra data fields (the EIA `data[N]=` query params) each endpoint needs.
|
||||
# operating-generator-capacity returns only id columns by default; latitude/longitude
|
||||
# must be requested explicitly. facility-fuel returns only id columns; generation
|
||||
# values must be requested explicitly.
|
||||
EIA_DATASET_DATA_FIELDS = {
|
||||
"electricity/operating-generator-capacity": ["latitude", "longitude"],
|
||||
"electricity/facility-fuel": ["latitude", "longitude"],
|
||||
"electricity/facility-fuel": ["generation", "gross-generation"],
|
||||
}
|
||||
|
||||
# Endpoints that do not reliably support retry with ad-hoc data[] field requests.
|
||||
EIA_NO_RETRY_EXTRA_FIELDS = {
|
||||
"electricity/facility-fuel",
|
||||
}
|
||||
|
||||
# US state abbreviation to FIPS code mapping for state-level GEOID linking
|
||||
@@ -127,92 +118,207 @@ def standardize_table_name(dataset_id: str) -> str:
|
||||
return f"{base[:46]}_{digest}"
|
||||
|
||||
|
||||
def query_eia_api(endpoint: str, params: Optional[Dict[str, Any]] = None, extra_data_fields: Optional[List[str]] = None) -> Optional[Dict]:
|
||||
"""Query EIA API endpoint."""
|
||||
# EIA API uses /data/ suffix for data queries
|
||||
class EIAClientError(Exception):
|
||||
"""Non-retryable EIA API error (e.g. 400 for unsupported fields)."""
|
||||
|
||||
|
||||
def iter_months(start: str, end: str):
|
||||
"""Yield 'YYYY-MM' strings from start to end inclusive."""
|
||||
sy, sm = (int(x) for x in start.split("-"))
|
||||
ey, em = (int(x) for x in end.split("-"))
|
||||
y, m = sy, sm
|
||||
while (y, m) <= (ey, em):
|
||||
yield f"{y:04d}-{m:02d}"
|
||||
m += 1
|
||||
if m > 12:
|
||||
m = 1
|
||||
y += 1
|
||||
|
||||
|
||||
def discover_period_range(endpoint: str) -> tuple:
|
||||
"""Return (earliest, latest) 'YYYY-MM' period strings for an endpoint.
|
||||
|
||||
Forces frequency=monthly so endpoints that also publish annual/quarterly
|
||||
series (e.g. facility-fuel) don't return non-monthly period formats that
|
||||
break iter_months. Routes through query_eia_api for retry/backoff coverage.
|
||||
"""
|
||||
def _one(direction: str) -> str:
|
||||
data = query_eia_api(
|
||||
endpoint,
|
||||
params={
|
||||
"length": 1,
|
||||
"sort[0][column]": "period",
|
||||
"sort[0][direction]": direction,
|
||||
},
|
||||
query_params={"frequency": "monthly"},
|
||||
)
|
||||
rows = (data or {}).get("response", {}).get("data", [])
|
||||
if not rows:
|
||||
raise RuntimeError(f"no rows returned discovering period range for {endpoint}")
|
||||
return rows[0]["period"]
|
||||
|
||||
return _one("asc"), _one("desc")
|
||||
|
||||
|
||||
def query_eia_api(
|
||||
endpoint: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
extra_data_fields: Optional[List[str]] = None,
|
||||
query_params: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[Dict]:
|
||||
"""Query EIA API endpoint with retry/backoff on transient errors.
|
||||
|
||||
Returns parsed JSON on success. Raises EIAClientError on 4xx (caller
|
||||
decides whether to retry without extra fields). Raises requests.RequestException
|
||||
after exhausting retries on transient errors.
|
||||
"""
|
||||
if not endpoint.endswith("/data"):
|
||||
endpoint = f"{endpoint}/data"
|
||||
|
||||
|
||||
url = f"{EIA_API_BASE}/{endpoint}/"
|
||||
req_params = {"api_key": EIA_API_KEY, "length": 5000}
|
||||
req_params: Dict[str, Any] = {"api_key": EIA_API_KEY, "length": 5000}
|
||||
if query_params:
|
||||
req_params.update(query_params)
|
||||
if params:
|
||||
req_params.update(params)
|
||||
|
||||
# Add extra data fields (e.g., latitude, longitude) using EIA's array syntax
|
||||
|
||||
if extra_data_fields:
|
||||
for i, field in enumerate(extra_data_fields):
|
||||
req_params[f"data[{i}]"] = field
|
||||
|
||||
try:
|
||||
resp = requests.get(url, params=req_params, timeout=(10, 20))
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.RequestException as e:
|
||||
print(f" api error on {endpoint}: {e}")
|
||||
return None
|
||||
|
||||
max_attempts = 10
|
||||
base_backoff = 5.0
|
||||
max_backoff = 120.0
|
||||
last_exc: Optional[Exception] = None
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
resp = requests.get(url, params=req_params, timeout=(10, 120))
|
||||
if 400 <= resp.status_code < 500 and resp.status_code != 429:
|
||||
raise EIAClientError(f"HTTP {resp.status_code} on {endpoint}: {resp.text[:200]}")
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except EIAClientError:
|
||||
raise
|
||||
except (requests.Timeout, requests.ConnectionError, requests.HTTPError, ValueError) as e:
|
||||
last_exc = e
|
||||
if attempt == max_attempts:
|
||||
break
|
||||
sleep_s = min(base_backoff * (2 ** (attempt - 1)), max_backoff)
|
||||
print(f" api error on {endpoint} (attempt {attempt}/{max_attempts}): {e}; retrying in {sleep_s:.0f}s")
|
||||
time.sleep(sleep_s)
|
||||
raise last_exc # type: ignore[misc]
|
||||
|
||||
|
||||
def fetch_eia_records(
|
||||
def fetch_eia_pages(
|
||||
endpoint: str,
|
||||
max_records: int = 0,
|
||||
extra_data_fields: Optional[List[str]] = None,
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Fetch EIA records with pagination; retry without extra fields on unsupported endpoints."""
|
||||
query_params: Optional[Dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
"""Yield paged EIA records; retry without extra fields on unsupported endpoints."""
|
||||
page_size = 5000
|
||||
offset = 0
|
||||
records: List[Dict[str, Any]] = []
|
||||
yielded = 0
|
||||
used_extra_fields = extra_data_fields
|
||||
previous_first_row: Optional[str] = None
|
||||
|
||||
while True:
|
||||
params = {"offset": offset, "length": page_size}
|
||||
data = query_eia_api(endpoint, params=params, extra_data_fields=used_extra_fields)
|
||||
|
||||
# Some endpoints return 400 when requesting unsupported fields (e.g. lat/lon).
|
||||
if (
|
||||
data is None
|
||||
and used_extra_fields
|
||||
and endpoint not in EIA_NO_RETRY_EXTRA_FIELDS
|
||||
):
|
||||
print(f" retrying {endpoint} without extra data fields")
|
||||
used_extra_fields = None
|
||||
data = query_eia_api(endpoint, params=params, extra_data_fields=None)
|
||||
try:
|
||||
data = query_eia_api(
|
||||
endpoint,
|
||||
params=params,
|
||||
extra_data_fields=used_extra_fields,
|
||||
query_params=query_params,
|
||||
)
|
||||
except EIAClientError as e:
|
||||
if used_extra_fields and endpoint not in EIA_NO_RETRY_EXTRA_FIELDS:
|
||||
print(f" retrying {endpoint} without extra data fields ({e})")
|
||||
used_extra_fields = None
|
||||
data = query_eia_api(
|
||||
endpoint,
|
||||
params=params,
|
||||
extra_data_fields=None,
|
||||
query_params=query_params,
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
if not data:
|
||||
return None if offset == 0 else records
|
||||
return
|
||||
|
||||
response = data.get("response", {})
|
||||
page_records = response.get("data", [])
|
||||
if not page_records:
|
||||
return records
|
||||
return
|
||||
|
||||
# Some EIA endpoints ignore offset and repeat page 1 forever.
|
||||
# Detect repeated first row signature and stop pagination.
|
||||
first_row_sig = json.dumps(page_records[0], sort_keys=True, default=str)
|
||||
if previous_first_row is not None and first_row_sig == previous_first_row:
|
||||
return records
|
||||
return
|
||||
previous_first_row = first_row_sig
|
||||
|
||||
records.extend(page_records)
|
||||
|
||||
total = response.get("total")
|
||||
try:
|
||||
total_int = int(total) if total is not None else None
|
||||
except (TypeError, ValueError):
|
||||
total_int = None
|
||||
|
||||
if max_records > 0 and len(records) >= max_records:
|
||||
return records[:max_records]
|
||||
if max_records > 0:
|
||||
remaining = max_records - yielded
|
||||
if remaining <= 0:
|
||||
return
|
||||
if len(page_records) > remaining:
|
||||
page_records = page_records[:remaining]
|
||||
|
||||
if total_int is not None and len(records) >= total_int:
|
||||
return records
|
||||
yield page_records, used_extra_fields
|
||||
yielded += len(page_records)
|
||||
|
||||
if max_records > 0 and yielded >= max_records:
|
||||
return
|
||||
|
||||
if total_int is not None and yielded >= total_int:
|
||||
return
|
||||
|
||||
if len(page_records) < page_size:
|
||||
return records
|
||||
return
|
||||
|
||||
offset += len(page_records)
|
||||
|
||||
|
||||
def fetch_eia_pages_by_month(
|
||||
endpoint: str,
|
||||
earliest: str,
|
||||
latest: str,
|
||||
max_records: int = 0,
|
||||
extra_data_fields: Optional[List[str]] = None,
|
||||
) -> Any:
|
||||
"""Yield pages across months, querying one month at a time.
|
||||
|
||||
EIA's bulk endpoints serve large offsets slowly and return frequent 503s
|
||||
under sustained load. Filtering by &frequency=monthly&start=X&end=X keeps
|
||||
each query small (~17k–28k rows per month for operating-generator-capacity)
|
||||
and dramatically reduces failure rate and wall time.
|
||||
"""
|
||||
yielded = 0
|
||||
for month in iter_months(earliest, latest):
|
||||
if max_records > 0 and yielded >= max_records:
|
||||
return
|
||||
remaining = max_records - yielded if max_records > 0 else 0
|
||||
month_params = {"frequency": "monthly", "start": month, "end": month}
|
||||
for page_records, used_extra_fields in fetch_eia_pages(
|
||||
endpoint,
|
||||
max_records=remaining,
|
||||
extra_data_fields=extra_data_fields,
|
||||
query_params=month_params,
|
||||
):
|
||||
yield page_records, used_extra_fields, month
|
||||
yielded += len(page_records)
|
||||
if max_records > 0 and yielded >= max_records:
|
||||
return
|
||||
|
||||
|
||||
def get_eia_datasets(category: str = "all") -> List[EIADataset]:
|
||||
"""Discover EIA datasets by category."""
|
||||
datasets = []
|
||||
@@ -256,79 +362,91 @@ def import_layer_to_postgis(dataset: EIADataset, table_name: str, max_records: i
|
||||
"""Import EIA dataset to PostGIS table."""
|
||||
conn = connect_db()
|
||||
try:
|
||||
# Check if this endpoint supports lat/lon geocoding
|
||||
extra_fields = EIA_GEOCODABLE_ENDPOINTS.get(dataset.api_endpoint)
|
||||
extra_fields = EIA_DATASET_DATA_FIELDS.get(dataset.api_endpoint)
|
||||
|
||||
# Query EIA API for data (with pagination), requesting lat/lon when supported.
|
||||
records = fetch_eia_records(
|
||||
earliest, latest = discover_period_range(dataset.api_endpoint)
|
||||
print(f" period range: {earliest} -> {latest}")
|
||||
|
||||
count = 0
|
||||
geo_count = 0
|
||||
initialized = False
|
||||
current_month: Optional[str] = None
|
||||
|
||||
for page_records, used_extra_fields, month in fetch_eia_pages_by_month(
|
||||
dataset.api_endpoint,
|
||||
earliest=earliest,
|
||||
latest=latest,
|
||||
max_records=max_records,
|
||||
extra_data_fields=extra_fields,
|
||||
)
|
||||
if not records:
|
||||
print(f" no data returned")
|
||||
):
|
||||
if month != current_month:
|
||||
if current_month is not None:
|
||||
print(f" progress: {count} rows ingested through {current_month}")
|
||||
current_month = month
|
||||
if not initialized:
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS public.{table_name} (
|
||||
gid SERIAL PRIMARY KEY,
|
||||
geom GEOMETRY(GEOMETRY, 4326),
|
||||
properties JSONB
|
||||
)
|
||||
"""
|
||||
)
|
||||
cur.execute(f"TRUNCATE TABLE public.{table_name}")
|
||||
initialized = True
|
||||
|
||||
geom_rows = []
|
||||
prop_rows = []
|
||||
for record in page_records:
|
||||
props_json = json.dumps(record)
|
||||
lat = record.get("latitude") or record.get("lat")
|
||||
lon = record.get("longitude") or record.get("lon")
|
||||
try:
|
||||
lat = float(lat) if lat is not None else None
|
||||
lon = float(lon) if lon is not None else None
|
||||
except (TypeError, ValueError):
|
||||
lat = lon = None
|
||||
|
||||
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
|
||||
geom_rows.append((lon, lat, props_json))
|
||||
geo_count += 1
|
||||
else:
|
||||
prop_rows.append((props_json,))
|
||||
count += 1
|
||||
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
if geom_rows:
|
||||
execute_values(
|
||||
cur,
|
||||
f"INSERT INTO public.{table_name} (geom, properties) VALUES %s",
|
||||
geom_rows,
|
||||
template="(ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)",
|
||||
page_size=1000,
|
||||
)
|
||||
if prop_rows:
|
||||
execute_values(
|
||||
cur,
|
||||
f"INSERT INTO public.{table_name} (properties) VALUES %s",
|
||||
prop_rows,
|
||||
template="(%s)",
|
||||
page_size=1000,
|
||||
)
|
||||
|
||||
# Track if API ended up running without extra fields after retry.
|
||||
if used_extra_fields is None:
|
||||
extra_fields = None
|
||||
|
||||
if not initialized:
|
||||
print(" no data returned")
|
||||
return False
|
||||
|
||||
# Create target table only once data is confirmed.
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS public.{table_name} (
|
||||
gid SERIAL PRIMARY KEY,
|
||||
geom GEOMETRY(GEOMETRY, 4326),
|
||||
properties JSONB
|
||||
)
|
||||
"""
|
||||
)
|
||||
# Truncate to avoid duplicates on re-runs
|
||||
cur.execute(f"TRUNCATE TABLE public.{table_name}")
|
||||
|
||||
# Insert records into PostGIS using psycopg2.
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
count = 0
|
||||
geo_count = 0
|
||||
for record in records:
|
||||
try:
|
||||
props_json = json.dumps(record)
|
||||
# Try to extract lat/lon for geometry
|
||||
lat = record.get("latitude") or record.get("lat")
|
||||
lon = record.get("longitude") or record.get("lon")
|
||||
try:
|
||||
lat = float(lat) if lat is not None else None
|
||||
lon = float(lon) if lon is not None else None
|
||||
except (TypeError, ValueError):
|
||||
lat = lon = None
|
||||
|
||||
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO public.{table_name} (geom, properties)
|
||||
VALUES (ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)
|
||||
""",
|
||||
(lon, lat, props_json),
|
||||
)
|
||||
geo_count += 1
|
||||
else:
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO public.{table_name} (properties)
|
||||
VALUES (%s)
|
||||
""",
|
||||
(props_json,),
|
||||
)
|
||||
count += 1
|
||||
except Exception as e:
|
||||
print(f" row insert error: {e}")
|
||||
continue
|
||||
|
||||
geo_msg = f", {geo_count} with geometry" if extra_fields else ""
|
||||
print(f" inserted {count} features into {table_name}{geo_msg}")
|
||||
return count > 0
|
||||
except Exception as e:
|
||||
print(f" error: {e}")
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
@@ -652,6 +770,153 @@ def build_summary_table(conn):
|
||||
cur.execute(f"analyze {SUMMARY_TABLE}")
|
||||
|
||||
|
||||
def build_flat_tables(conn):
|
||||
"""Create analyst-friendly flat tables from JSON properties."""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select table_name
|
||||
from information_schema.tables
|
||||
where table_schema='public'
|
||||
and table_name in (
|
||||
'energy_eia_electricity_operating_generator_capacity',
|
||||
'energy_eia_electricity_facility_fuel'
|
||||
)
|
||||
"""
|
||||
)
|
||||
available = {row[0] for row in cur.fetchall()}
|
||||
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("drop table if exists public.energy_eia_electric_power_operational_data_flat")
|
||||
cur.execute("drop table if exists public.energy_eia_rto_region_data_flat")
|
||||
|
||||
if "energy_eia_electricity_operating_generator_capacity" in available:
|
||||
cur.execute("drop table if exists public.energy_eia_operating_generator_capacity_flat")
|
||||
# EIA stored lower-48 longitudes as positive numbers for periods
|
||||
# 2008-01 through 2010-11 (~600k rows). The negative sign is
|
||||
# restored here for any state other than AK (Alaska legitimately
|
||||
# has Aleutian plants east of the dateline with positive lons).
|
||||
# geom is rebuilt from the corrected coordinates so the source
|
||||
# table's pre-correction geometry is discarded.
|
||||
cur.execute(
|
||||
r"""
|
||||
create table public.energy_eia_operating_generator_capacity_flat as
|
||||
with parsed as (
|
||||
select
|
||||
gid,
|
||||
properties,
|
||||
properties->>'stateid' as state_id_raw,
|
||||
case
|
||||
when (properties->>'latitude') ~ '^-?[0-9]+(\.[0-9]+)?$'
|
||||
then (properties->>'latitude')::double precision
|
||||
end as latitude_raw,
|
||||
case
|
||||
when (properties->>'longitude') ~ '^-?[0-9]+(\.[0-9]+)?$'
|
||||
then (properties->>'longitude')::double precision
|
||||
end as longitude_raw
|
||||
from public.energy_eia_electricity_operating_generator_capacity
|
||||
),
|
||||
fixed as (
|
||||
select
|
||||
*,
|
||||
case
|
||||
when longitude_raw > 0 and state_id_raw <> 'AK'
|
||||
then -longitude_raw
|
||||
else longitude_raw
|
||||
end as longitude_fixed
|
||||
from parsed
|
||||
)
|
||||
select
|
||||
gid,
|
||||
case
|
||||
when latitude_raw is not null
|
||||
and longitude_fixed is not null
|
||||
and latitude_raw between -90 and 90
|
||||
and longitude_fixed between -180 and 180
|
||||
then st_setsrid(st_makepoint(longitude_fixed, latitude_raw), 4326)
|
||||
end as geom,
|
||||
properties->>'period' as period,
|
||||
properties->>'plantid' as plant_id,
|
||||
properties->>'plantName' as plant_name,
|
||||
state_id_raw as state_id,
|
||||
properties->>'stateName' as state_name,
|
||||
properties->>'entityid' as entity_id,
|
||||
properties->>'entityName' as entity_name,
|
||||
properties->>'generatorid' as generator_id,
|
||||
properties->>'status' as status,
|
||||
properties->>'sector' as sector,
|
||||
properties->>'sectorName' as sector_name,
|
||||
properties->>'energy_source_code' as energy_source_code,
|
||||
properties->>'energy-source-desc' as energy_source_desc,
|
||||
properties->>'prime_mover_code' as prime_mover_code,
|
||||
properties->>'balancing_authority_code' as balancing_authority_code,
|
||||
properties->>'balancing-authority-name' as balancing_authority_name,
|
||||
latitude_raw as latitude,
|
||||
longitude_fixed as longitude,
|
||||
properties as raw_properties
|
||||
from fixed
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_operating_generator_capacity_flat_geom_gix "
|
||||
"on public.energy_eia_operating_generator_capacity_flat using gist (geom)"
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_operating_generator_capacity_flat_plant_id_idx "
|
||||
"on public.energy_eia_operating_generator_capacity_flat (plant_id)"
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_operating_generator_capacity_flat_state_id_idx "
|
||||
"on public.energy_eia_operating_generator_capacity_flat (state_id)"
|
||||
)
|
||||
cur.execute("analyze public.energy_eia_operating_generator_capacity_flat")
|
||||
|
||||
if "energy_eia_electricity_facility_fuel" in available:
|
||||
cur.execute("drop table if exists public.energy_eia_facility_fuel_flat")
|
||||
cur.execute(
|
||||
r"""
|
||||
create table public.energy_eia_facility_fuel_flat as
|
||||
select
|
||||
gid,
|
||||
properties->>'period' as period,
|
||||
coalesce(properties->>'plantCode', properties->>'plantid') as plant_id,
|
||||
properties->>'plantName' as plant_name,
|
||||
properties->>'state' as state_id,
|
||||
properties->>'stateDescription' as state_name,
|
||||
properties->>'primeMover' as prime_mover_code,
|
||||
properties->>'primeMoverDescription' as prime_mover_desc,
|
||||
properties->>'fuel2002' as energy_source_code,
|
||||
properties->>'fuel2002Description' as energy_source_desc,
|
||||
case
|
||||
when (properties->>'generation') ~ '^-?[0-9]+(\.[0-9]+)?$'
|
||||
then (properties->>'generation')::double precision
|
||||
else null
|
||||
end as generation_mwh,
|
||||
case
|
||||
when (properties->>'gross-generation') ~ '^-?[0-9]+(\.[0-9]+)?$'
|
||||
then (properties->>'gross-generation')::double precision
|
||||
else null
|
||||
end as gross_generation_mwh,
|
||||
properties as raw_properties
|
||||
from public.energy_eia_electricity_facility_fuel
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_facility_fuel_flat_plant_id_idx "
|
||||
"on public.energy_eia_facility_fuel_flat (plant_id)"
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_facility_fuel_flat_period_idx "
|
||||
"on public.energy_eia_facility_fuel_flat (period)"
|
||||
)
|
||||
cur.execute(
|
||||
"create index energy_eia_facility_fuel_flat_state_id_idx "
|
||||
"on public.energy_eia_facility_fuel_flat (state_id)"
|
||||
)
|
||||
cur.execute("analyze public.energy_eia_facility_fuel_flat")
|
||||
|
||||
|
||||
def prune_stale_layer_versions(conn) -> int:
|
||||
"""Drop superseded EIA layer tables and remove stale catalog rows.
|
||||
|
||||
@@ -718,6 +983,90 @@ def prune_stale_layer_versions(conn) -> int:
|
||||
return pruned
|
||||
|
||||
|
||||
def prune_unselected_layers(conn, selected_table_names: List[str]) -> int:
|
||||
"""Drop catalog/table entries that are not in the currently selected dataset set."""
|
||||
selected = set(selected_table_names)
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select table_name
|
||||
from public.energy_atlas_layers_catalog
|
||||
where table_name like 'energy_eia_%'
|
||||
"""
|
||||
)
|
||||
existing = [row[0] for row in cur.fetchall()]
|
||||
|
||||
to_remove = [name for name in existing if name not in selected]
|
||||
removed = 0
|
||||
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
for table_name in to_remove:
|
||||
cur.execute(
|
||||
"""
|
||||
select exists (
|
||||
select 1
|
||||
from information_schema.tables
|
||||
where table_schema='public' and table_name=%s
|
||||
)
|
||||
""",
|
||||
(table_name,),
|
||||
)
|
||||
table_exists = cur.fetchone()[0]
|
||||
|
||||
if table_exists:
|
||||
cur.execute(
|
||||
sql.SQL("drop table if exists public.{} cascade").format(
|
||||
sql.Identifier(table_name)
|
||||
)
|
||||
)
|
||||
print(f"pruned unselected table public.{table_name}")
|
||||
|
||||
cur.execute(
|
||||
"delete from public.energy_atlas_layers_catalog where table_name = %s",
|
||||
(table_name,),
|
||||
)
|
||||
removed += 1
|
||||
|
||||
return removed
|
||||
|
||||
|
||||
FINAL_FLAT_TABLES = (
|
||||
"energy_eia_operating_generator_capacity_flat",
|
||||
"energy_eia_facility_fuel_flat",
|
||||
)
|
||||
|
||||
|
||||
def keep_only_target_flat_table(conn) -> int:
|
||||
"""Drop all energy_eia_* tables except the final flat tables."""
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
select table_name
|
||||
from information_schema.tables
|
||||
where table_schema='public'
|
||||
and table_name like 'energy_eia_%%'
|
||||
and table_name <> ALL(%s)
|
||||
""",
|
||||
(list(FINAL_FLAT_TABLES),),
|
||||
)
|
||||
to_drop = [row[0] for row in cur.fetchall()]
|
||||
|
||||
dropped = 0
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
for table_name in to_drop:
|
||||
cur.execute(
|
||||
sql.SQL("drop table if exists public.{} cascade").format(
|
||||
sql.Identifier(table_name)
|
||||
)
|
||||
)
|
||||
print(f"dropped non-target table public.{table_name}")
|
||||
dropped += 1
|
||||
|
||||
return dropped
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -727,8 +1076,8 @@ def parse_args():
|
||||
)
|
||||
parser.add_argument(
|
||||
"--category",
|
||||
choices=["electric", "power", "gas", "all"],
|
||||
default="all",
|
||||
choices=["power", "all"],
|
||||
default="power",
|
||||
help="Infrastructure category to ingest.",
|
||||
)
|
||||
parser.add_argument(
|
||||
@@ -737,6 +1086,16 @@ def parse_args():
|
||||
default=0,
|
||||
help="Cap on API records to process per dataset (0=all).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--endpoint",
|
||||
action="append",
|
||||
default=None,
|
||||
help=(
|
||||
"Limit ingest to specific EIA endpoint(s). "
|
||||
"Repeatable. Substring match against api_endpoint. "
|
||||
"Other datasets are skipped (not re-ingested, not pruned)."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-ingest",
|
||||
action="store_true",
|
||||
@@ -775,6 +1134,8 @@ def main():
|
||||
# Build ingest list with table names
|
||||
datasets_to_ingest = []
|
||||
for dataset in datasets:
|
||||
if args.endpoint and not any(filt in dataset.api_endpoint for filt in args.endpoint):
|
||||
continue
|
||||
table_name = standardize_table_name(dataset.dataset_id)
|
||||
datasets_to_ingest.append((dataset, table_name, dataset.category))
|
||||
|
||||
@@ -793,69 +1154,49 @@ def main():
|
||||
|
||||
if not args.skip_ingest:
|
||||
for dataset, table_name, category in datasets_to_ingest:
|
||||
try:
|
||||
print(f"importing {dataset.name} -> public.{table_name} [{category}]")
|
||||
success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records)
|
||||
if success:
|
||||
upsert_layer_catalog(conn, table_name, dataset, category)
|
||||
add_geom_index_and_analyze(conn, table_name)
|
||||
except Exception as e:
|
||||
print(f" warning: import failed ({type(e).__name__}); skipping")
|
||||
continue
|
||||
print(f"importing {dataset.name} -> public.{table_name} [{category}]")
|
||||
success = import_layer_to_postgis(dataset, table_name, max_records=args.max_records)
|
||||
if success:
|
||||
upsert_layer_catalog(conn, table_name, dataset, category)
|
||||
add_geom_index_and_analyze(conn, table_name)
|
||||
|
||||
if not args.keep_stale_tables:
|
||||
pruned = prune_stale_layer_versions(conn)
|
||||
if pruned > 0:
|
||||
print(f"pruned stale layer versions: {pruned}")
|
||||
|
||||
# Rebuild GEOID links from catalog.
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
with ranked as (
|
||||
select
|
||||
c.table_name,
|
||||
c.category,
|
||||
row_number() over (
|
||||
partition by coalesce(
|
||||
nullif(regexp_replace(c.source_url, '/data/?$', ''), ''),
|
||||
nullif(c.source_item_id, ''),
|
||||
c.table_name
|
||||
)
|
||||
order by c.imported_at desc, c.table_name desc
|
||||
) as rn
|
||||
from public.energy_atlas_layers_catalog c
|
||||
join information_schema.tables t
|
||||
on t.table_schema = 'public'
|
||||
and t.table_name = c.table_name
|
||||
)
|
||||
select table_name, category
|
||||
from ranked
|
||||
where rn = 1
|
||||
order by table_name
|
||||
"""
|
||||
)
|
||||
catalog_rows = cur.fetchall()
|
||||
# Pruning compares against the *full* selected set; skip when --endpoint
|
||||
# is narrowing the run, otherwise we'd drop catalog entries for endpoints
|
||||
# we deliberately chose not to touch.
|
||||
if not args.endpoint:
|
||||
selected_table_names = [table_name for _, table_name, _ in datasets_to_ingest]
|
||||
removed = prune_unselected_layers(conn, selected_table_names)
|
||||
if removed > 0:
|
||||
print(f"pruned unselected layers: {removed}")
|
||||
|
||||
reset_link_tables(conn)
|
||||
for table_name, category in catalog_rows:
|
||||
print(f"linking public.{table_name} -> GEOID ({category})")
|
||||
link_one_table(conn, table_name, category)
|
||||
if not args.keep_stale_tables:
|
||||
pruned = prune_stale_layer_versions(conn)
|
||||
if pruned > 0:
|
||||
print(f"pruned stale layer versions: {pruned}")
|
||||
|
||||
build_summary_table(conn)
|
||||
build_flat_tables(conn)
|
||||
dropped_non_target = keep_only_target_flat_table(conn)
|
||||
if dropped_non_target > 0:
|
||||
print(f"dropped non-target energy tables: {dropped_non_target}")
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select count(*) from public.energy_atlas_layers_catalog")
|
||||
catalog_count = cur.fetchone()[0]
|
||||
cur.execute(f"select count(*) from {LINK_TABLE}")
|
||||
link_count = cur.fetchone()[0]
|
||||
cur.execute(f"select count(*) from {SUMMARY_TABLE}")
|
||||
summary_count = cur.fetchone()[0]
|
||||
counts = {}
|
||||
for tbl in FINAL_FLAT_TABLES:
|
||||
cur.execute(
|
||||
"select to_regclass(%s) is not null",
|
||||
(f"public.{tbl}",),
|
||||
)
|
||||
if cur.fetchone()[0]:
|
||||
cur.execute(f"select count(*) from public.{tbl}")
|
||||
counts[tbl] = cur.fetchone()[0]
|
||||
else:
|
||||
counts[tbl] = None
|
||||
|
||||
print(
|
||||
f"\ndone: catalog_layers={catalog_count}, "
|
||||
f"geoid_links={link_count}, geoid_summary_rows={summary_count}"
|
||||
)
|
||||
print(f"\ndone: catalog_layers={catalog_count}")
|
||||
for tbl, n in counts.items():
|
||||
print(f" {tbl}: {n if n is not None else 'missing'} rows")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user