update ingest_eia

This commit is contained in:
2026-05-15 20:53:42 -07:00
parent 4e6a564b6c
commit b442998eb5
2 changed files with 96 additions and 1 deletions

View File

@@ -29,6 +29,7 @@ from typing import List, Optional, Dict, Any
import psycopg2
import requests
from psycopg2 import sql
DB_NAME = "data_centers"
@@ -651,6 +652,72 @@ def build_summary_table(conn):
cur.execute(f"analyze {SUMMARY_TABLE}")
def prune_stale_layer_versions(conn) -> int:
"""Drop superseded EIA layer tables and remove stale catalog rows.
Superseded versions are identified by a normalized source key
(source_url without trailing /data). The newest entry is kept.
"""
with conn.cursor() as cur:
cur.execute(
"""
with ranked as (
select
c.table_name,
row_number() over (
partition by coalesce(
nullif(regexp_replace(c.source_url, '/data/?$', ''), ''),
nullif(c.source_item_id, ''),
c.table_name
)
order by c.imported_at desc, c.table_name desc
) as rn
from public.energy_atlas_layers_catalog c
)
select r.table_name
from ranked r
where r.rn > 1
"""
)
stale_tables = [row[0] for row in cur.fetchall()]
pruned = 0
with conn:
with conn.cursor() as cur:
for table_name in stale_tables:
# Guardrail: only manage script-owned EIA tables.
if not table_name.startswith("energy_eia_"):
continue
cur.execute(
"""
select exists (
select 1
from information_schema.tables
where table_schema='public' and table_name=%s
)
""",
(table_name,),
)
table_exists = cur.fetchone()[0]
if table_exists:
cur.execute(
sql.SQL("drop table if exists public.{} cascade").format(
sql.Identifier(table_name)
)
)
print(f"pruned stale table public.{table_name}")
cur.execute(
"delete from public.energy_atlas_layers_catalog where table_name = %s",
(table_name,),
)
pruned += 1
return pruned
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
@@ -680,6 +747,11 @@ def parse_args():
action="store_true",
help="List selected datasets and exit.",
)
parser.add_argument(
"--keep-stale-tables",
action="store_true",
help="Do not prune superseded EIA tables/catalog entries.",
)
return parser.parse_args()
@@ -731,12 +803,35 @@ def main():
print(f" warning: import failed ({type(e).__name__}); skipping")
continue
if not args.keep_stale_tables:
pruned = prune_stale_layer_versions(conn)
if pruned > 0:
print(f"pruned stale layer versions: {pruned}")
# Rebuild GEOID links from catalog.
with conn.cursor() as cur:
cur.execute(
"""
with ranked as (
select
c.table_name,
c.category,
row_number() over (
partition by coalesce(
nullif(regexp_replace(c.source_url, '/data/?$', ''), ''),
nullif(c.source_item_id, ''),
c.table_name
)
order by c.imported_at desc, c.table_name desc
) as rn
from public.energy_atlas_layers_catalog c
join information_schema.tables t
on t.table_schema = 'public'
and t.table_name = c.table_name
)
select table_name, category
from public.energy_atlas_layers_catalog
from ranked
where rn = 1
order by table_name
"""
)