diff --git a/__pycache__/ingest_eia_energy_layers.cpython-314.pyc b/__pycache__/ingest_eia_energy_layers.cpython-314.pyc index abd9810..0756fad 100644 Binary files a/__pycache__/ingest_eia_energy_layers.cpython-314.pyc and b/__pycache__/ingest_eia_energy_layers.cpython-314.pyc differ diff --git a/ingest_eia_energy_layers.py b/ingest_eia_energy_layers.py index 81ef532..c82a6f0 100644 --- a/ingest_eia_energy_layers.py +++ b/ingest_eia_energy_layers.py @@ -29,6 +29,7 @@ from typing import List, Optional, Dict, Any import psycopg2 import requests +from psycopg2 import sql DB_NAME = "data_centers" @@ -651,6 +652,72 @@ def build_summary_table(conn): cur.execute(f"analyze {SUMMARY_TABLE}") +def prune_stale_layer_versions(conn) -> int: + """Drop superseded EIA layer tables and remove stale catalog rows. + + Superseded versions are identified by a normalized source key + (source_url without trailing /data). The newest entry is kept. + """ + with conn.cursor() as cur: + cur.execute( + """ + with ranked as ( + select + c.table_name, + row_number() over ( + partition by coalesce( + nullif(regexp_replace(c.source_url, '/data/?$', ''), ''), + nullif(c.source_item_id, ''), + c.table_name + ) + order by c.imported_at desc, c.table_name desc + ) as rn + from public.energy_atlas_layers_catalog c + ) + select r.table_name + from ranked r + where r.rn > 1 + """ + ) + stale_tables = [row[0] for row in cur.fetchall()] + + pruned = 0 + with conn: + with conn.cursor() as cur: + for table_name in stale_tables: + # Guardrail: only manage script-owned EIA tables. + if not table_name.startswith("energy_eia_"): + continue + + cur.execute( + """ + select exists ( + select 1 + from information_schema.tables + where table_schema='public' and table_name=%s + ) + """, + (table_name,), + ) + table_exists = cur.fetchone()[0] + + if table_exists: + cur.execute( + sql.SQL("drop table if exists public.{} cascade").format( + sql.Identifier(table_name) + ) + ) + print(f"pruned stale table public.{table_name}") + + cur.execute( + "delete from public.energy_atlas_layers_catalog where table_name = %s", + (table_name,), + ) + pruned += 1 + + return pruned + + def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( @@ -680,6 +747,11 @@ def parse_args(): action="store_true", help="List selected datasets and exit.", ) + parser.add_argument( + "--keep-stale-tables", + action="store_true", + help="Do not prune superseded EIA tables/catalog entries.", + ) return parser.parse_args() @@ -730,13 +802,36 @@ def main(): except Exception as e: print(f" warning: import failed ({type(e).__name__}); skipping") continue + + if not args.keep_stale_tables: + pruned = prune_stale_layer_versions(conn) + if pruned > 0: + print(f"pruned stale layer versions: {pruned}") # Rebuild GEOID links from catalog. with conn.cursor() as cur: cur.execute( """ + with ranked as ( + select + c.table_name, + c.category, + row_number() over ( + partition by coalesce( + nullif(regexp_replace(c.source_url, '/data/?$', ''), ''), + nullif(c.source_item_id, ''), + c.table_name + ) + order by c.imported_at desc, c.table_name desc + ) as rn + from public.energy_atlas_layers_catalog c + join information_schema.tables t + on t.table_schema = 'public' + and t.table_name = c.table_name + ) select table_name, category - from public.energy_atlas_layers_catalog + from ranked + where rn = 1 order by table_name """ )