#!/usr/bin/env python3 """ Ingest LegiScan legislative datasets for all US states (2016-2026) into PostgreSQL. Fetches all state session datasets from the LegiScan API, parses bill JSONs from each ZIP archive, and loads them into the data_centers PostgreSQL database. Bills are tagged with relevance categories (data_center, large_load, ratepayer_protection, etc.). Usage: python ingest_legiscan.py [--all | --setup-db | --fetch | --load | --tag] [--state XX] [--year-start YYYY] [--dry-run] [--verbose] Options: --all Run all phases in sequence --setup-db Create/update database tables and indexes --fetch Download dataset ZIPs for all states (uses hash caching) --load Parse cached ZIPs and insert/update bills in DB --tag (Re)apply relevance tagging to all loaded bills --state XX Restrict to one state (e.g., CA) --year-start N Earliest session year to include (default: 2016) --dry-run Print what would be done; no API calls or DB writes --verbose Extra progress output Environment: LEGISCAN_API_KEY Required PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD PostgreSQL connection (DB: data_centers) """ import argparse import base64 import io import json import logging import os import sys import time import zipfile from datetime import datetime from pathlib import Path from typing import Optional import psycopg2 import psycopg2.extras import requests # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- DB_NAME = "data_centers" API_KEY = os.environ.get("LEGISCAN_API_KEY") API_BASE = "https://api.legiscan.com/" CACHE_DIR = Path("data/legiscan_cache") MIN_YEAR_DEFAULT = 2016 RATE_LIMIT_DELAY = 0.5 # seconds between API calls # Keyword categories for relevance tagging. # Keys become the tag values stored in legiscan_bills.relevance_tags[]. RELEVANCE_KEYWORDS: dict[str, list[str]] = { "data_center": [ "data center", "data centre", "hyperscale", "colocation", "colo facility", "server farm", "cloud computing facility", "internet exchange", "carrier hotel", "artificial intelligence facility", "ai campus", "ai data center", "gpu cluster", "compute facility", "high performance computing", "hpc facility", "data hall", "network access point", "data warehousing facility", ], "large_load": [ "large load", "large power consumer", "large electricity consumer", "high electricity consumption", "high power consumption", "megawatt load", "gigawatt load", "cryptocurrency mining", "bitcoin mining", "blockchain mining", "crypto mining", "digital asset mining", "proof of work", "electric arc furnace", "large industrial customer", "high-density load", "new large load", "load growth", "extraordinary load", ], "ratepayer_protection": [ "ratepayer", "rate payer", "cost shift", "cost shifting", "cost allocation", "cross-subsidy", "cross subsidy", "rate design", "rate structure", "electricity rate", "electric rate", "utility rate", "rate increase", "rate burden", "rate base", "stranded cost", "rate class", "customer protection", "consumer protection", "electric customer", "residential customer", "demand charge", "transmission cost", "grid upgrade cost", "interconnection cost", "cost recovery", "rate relief", "affordability", "energy burden", ], "grid_impact": [ "grid reliability", "grid stability", "grid congestion", "grid modernization", "grid infrastructure", "electric grid", "power grid", "electricity grid", "transmission upgrade", "transmission expansion", "interconnection queue", "interconnection study", "demand response", "curtailment", "grid capacity", "system reliability", "capacity expansion", "electric system", "power system reliability", "grid resilience", "grid planning", "integrated resource plan", ], "water_use": [ "water consumption", "cooling water", "water efficiency", "water use effectiveness", "evaporative cooling", "water withdrawal", "water discharge", "water impact", "water footprint", "cooling tower", "water-cooled", "once-through cooling", "recycled water", "water stress", "water scarcity", ], "tax_incentive": [ "tax credit", "tax exemption", "tax abatement", "tax incentive", "sales tax exemption", "property tax exemption", "tax break", "tax relief", "enterprise zone", "economic incentive", "business incentive", "investment credit", "job creation credit", "economic development incentive", "opportunity zone", "tax subsidy", ], "energy_policy": [ "renewable energy", "clean energy", "energy efficiency", "power purchase agreement", " ppa ", "green tariff", "clean power", "carbon neutral", "net zero", "decarbonization", "energy procurement", "24/7 clean energy", "carbon-free", "clean electricity", "energy storage", "virtual power plant", "net metering", "green power", ], "siting_permitting": [ "conditional use permit", "special use permit", "land use permit", "zoning", "facility siting", "environmental review", "environmental impact", "noise ordinance", "setback requirement", "building permit", "construction permit", "site approval", "local approval", "permit requirement", "permitting process", "local control", "preemption", ], } # Status code labels (LegiScan) STATUS_LABELS = { 0: "N/A", 1: "Introduced", 2: "Engrossed", 3: "Enrolled", 4: "Passed", 5: "Vetoed", 6: "Failed", 7: "Override", 8: "Chaptered", 9: "Referred", 10: "Report Pass", 11: "Report DNP", 12: "Draft", } # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db_connection(): return psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) DDL = """ CREATE TABLE IF NOT EXISTS legiscan_sessions ( session_id INTEGER PRIMARY KEY, state_id INTEGER NOT NULL, state_abbr VARCHAR(2) NOT NULL, year_start INTEGER NOT NULL, year_end INTEGER NOT NULL, session_title TEXT, session_tag TEXT, is_special BOOLEAN DEFAULT FALSE, is_prior BOOLEAN DEFAULT FALSE, dataset_hash VARCHAR(32), dataset_date DATE, dataset_size_mb FLOAT, bill_count INTEGER DEFAULT 0, imported_at TIMESTAMPTZ ); CREATE TABLE IF NOT EXISTS legiscan_bills ( bill_id INTEGER PRIMARY KEY, session_id INTEGER REFERENCES legiscan_sessions(session_id), state VARCHAR(2) NOT NULL, bill_number VARCHAR(50), bill_type VARCHAR(10), title TEXT, description TEXT, status INTEGER, status_date DATE, completed INTEGER DEFAULT 0, body VARCHAR(10), url TEXT, state_link TEXT, change_hash VARCHAR(32), subjects TEXT[], sponsor_count INTEGER DEFAULT 0, vote_count INTEGER DEFAULT 0, text_count INTEGER DEFAULT 0, is_relevant BOOLEAN DEFAULT FALSE, relevance_tags TEXT[], imported_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_ls_bills_state ON legiscan_bills(state); CREATE INDEX IF NOT EXISTS idx_ls_bills_session ON legiscan_bills(session_id); CREATE INDEX IF NOT EXISTS idx_ls_bills_status ON legiscan_bills(status); CREATE INDEX IF NOT EXISTS idx_ls_bills_relevant ON legiscan_bills(is_relevant) WHERE is_relevant; CREATE INDEX IF NOT EXISTS idx_ls_bills_subjects ON legiscan_bills USING gin(subjects); CREATE INDEX IF NOT EXISTS idx_ls_bills_rtags ON legiscan_bills USING gin(relevance_tags); CREATE INDEX IF NOT EXISTS idx_ls_bills_fts ON legiscan_bills USING gin(to_tsvector('english', COALESCE(title, '') || ' ' || COALESCE(description, ''))); """ def setup_db(conn): with conn.cursor() as cur: cur.execute(DDL) conn.commit() log.info("Database tables and indexes ready.") # --------------------------------------------------------------------------- # LegiScan API helpers # --------------------------------------------------------------------------- def _api_get(params: dict, timeout: int = 120) -> dict: """Make one LegiScan API call and return the parsed JSON.""" params["key"] = API_KEY resp = requests.get(API_BASE, params=params, timeout=timeout) resp.raise_for_status() data = resp.json() if data.get("status") != "OK": raise RuntimeError(f"LegiScan API error: {data}") return data def get_all_dataset_metadata(year_start: int, state_filter: Optional[str] = None) -> list[dict]: """Fetch full dataset list (one API call), filter to year_start+.""" log.info("Fetching dataset list from LegiScan…") data = _api_get({"op": "getDatasetList"}) sessions = data["datasetlist"] log.info(f" Total sessions across all states: {len(sessions)}") sessions = [s for s in sessions if s["year_start"] >= year_start] if state_filter: # Need to map state abbr → state_id. Derive from a quick per-state call. log.info(f" Filtering to state {state_filter}…") state_data = _api_get({"op": "getDatasetList", "state": state_filter}) valid_ids = {s["session_id"] for s in state_data["datasetlist"]} sessions = [s for s in sessions if s["session_id"] in valid_ids] log.info(f" Sessions matching filters: {len(sessions)}") return sessions def download_dataset_zip(session: dict, dry_run: bool = False) -> tuple[Optional[bytes], bool]: """Download a dataset ZIP via the API; cache to disk. Returns (zip_bytes, api_call_made) — api_call_made is True only when the network was actually hit so the caller can rate-limit appropriately.""" session_id = session["session_id"] dataset_hash = session["dataset_hash"] access_key = session["access_key"] CACHE_DIR.mkdir(parents=True, exist_ok=True) cache_path = CACHE_DIR / f"{session_id}_{dataset_hash}.zip" if cache_path.exists(): log.debug(f" Cache hit: {cache_path.name}") return cache_path.read_bytes(), False if dry_run: log.info(f" [dry-run] Would download session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)") return None, False log.info(f" Downloading session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)…") data = _api_get({"op": "getDataset", "access_key": access_key, "id": session_id}) zip_bytes = base64.b64decode(data["dataset"]["zip"]) cache_path.write_bytes(zip_bytes) log.info(f" Cached → {cache_path.name}") return zip_bytes, True return zip_bytes # --------------------------------------------------------------------------- # Relevance tagging # --------------------------------------------------------------------------- def score_relevance(title: str, description: str, subjects: list[str]) -> tuple[bool, list[str]]: """Return (is_relevant, list_of_matched_tags).""" haystack = " ".join([ (title or "").lower(), (description or "").lower(), " ".join(s.lower() for s in subjects), ]) tags = [] for tag, keywords in RELEVANCE_KEYWORDS.items(): if any(kw in haystack for kw in keywords): tags.append(tag) return bool(tags), tags # --------------------------------------------------------------------------- # ZIP processing and DB loading # --------------------------------------------------------------------------- def _state_abbr_from_zip(zf: zipfile.ZipFile) -> str: """Extract the state abbreviation from the ZIP's path structure.""" for name in zf.namelist(): parts = name.split("/") if len(parts) >= 1 and len(parts[0]) == 2: return parts[0] return "??" def process_dataset( session: dict, zip_bytes: bytes, conn, state_abbr: Optional[str] = None, dry_run: bool = False, verbose: bool = False, ) -> int: """Parse all bill JSONs from a ZIP and upsert into legiscan_bills. Returns count.""" session_id = session["session_id"] with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: if not state_abbr: state_abbr = _state_abbr_from_zip(zf) bill_files = [n for n in zf.namelist() if "/bill/" in n and n.endswith(".json")] if not bill_files: log.warning(f" Session {session_id}: no bill files found in ZIP.") return 0 rows = [] for fname in bill_files: try: raw = json.loads(zf.read(fname)) b = raw.get("bill", raw) except Exception as e: log.warning(f" Could not parse {fname}: {e}") continue subjects = [s["subject_name"] for s in (b.get("subjects") or []) if s.get("subject_name")] is_rel, tags = score_relevance( b.get("title", ""), b.get("description", ""), subjects, ) status_date = b.get("status_date") or None rows.append(( b["bill_id"], session_id, b.get("state", state_abbr), b.get("bill_number"), b.get("bill_type"), b.get("title"), b.get("description"), b.get("status"), status_date, b.get("completed", 0), b.get("body"), b.get("url"), b.get("state_link"), b.get("change_hash"), subjects or None, len(b.get("sponsors") or []), len(b.get("votes") or []), len(b.get("texts") or []), is_rel, tags or None, )) if dry_run: log.info(f" [dry-run] Session {session_id} ({state_abbr}): would insert/update {len(rows)} bills") return len(rows) UPSERT = """ INSERT INTO legiscan_bills ( bill_id, session_id, state, bill_number, bill_type, title, description, status, status_date, completed, body, url, state_link, change_hash, subjects, sponsor_count, vote_count, text_count, is_relevant, relevance_tags, imported_at ) VALUES %s ON CONFLICT (bill_id) DO UPDATE SET change_hash = EXCLUDED.change_hash, status = EXCLUDED.status, status_date = EXCLUDED.status_date, completed = EXCLUDED.completed, subjects = EXCLUDED.subjects, sponsor_count = EXCLUDED.sponsor_count, vote_count = EXCLUDED.vote_count, text_count = EXCLUDED.text_count, is_relevant = EXCLUDED.is_relevant, relevance_tags = EXCLUDED.relevance_tags, imported_at = NOW() WHERE legiscan_bills.change_hash IS DISTINCT FROM EXCLUDED.change_hash """ template = "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())" with conn.cursor() as cur: psycopg2.extras.execute_values(cur, UPSERT, rows, template=template, page_size=500) count = cur.rowcount # Update session bill_count with conn.cursor() as cur: cur.execute( "UPDATE legiscan_sessions SET bill_count = %s, imported_at = NOW() WHERE session_id = %s", (len(rows), session_id), ) conn.commit() if verbose: relevant = sum(1 for r in rows if r[18]) log.info(f" Session {session_id} ({state_abbr}): {len(rows)} bills, {relevant} relevant, {count} upserted") return len(rows) def upsert_session(session: dict, state_abbr: str, conn, dry_run: bool = False): """Insert or update a session record.""" if dry_run: return with conn.cursor() as cur: cur.execute(""" INSERT INTO legiscan_sessions (session_id, state_id, state_abbr, year_start, year_end, session_title, session_tag, is_special, is_prior, dataset_hash, dataset_date, dataset_size_mb) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (session_id) DO UPDATE SET dataset_hash = EXCLUDED.dataset_hash, dataset_date = EXCLUDED.dataset_date, dataset_size_mb = EXCLUDED.dataset_size_mb, session_title = EXCLUDED.session_title """, ( session["session_id"], session["state_id"], state_abbr, session["year_start"], session["year_end"], session.get("session_title"), session.get("session_tag"), bool(session.get("special")), bool(session.get("prior")), session.get("dataset_hash"), session.get("dataset_date"), session.get("dataset_size", 0) / 1e6, )) conn.commit() def needs_import(session: dict, conn) -> bool: """Return True if this session's dataset_hash differs from what's in the DB.""" with conn.cursor() as cur: cur.execute( "SELECT dataset_hash FROM legiscan_sessions WHERE session_id = %s", (session["session_id"],), ) row = cur.fetchone() if row is None: return True return row[0] != session["dataset_hash"] # --------------------------------------------------------------------------- # Retag phase # --------------------------------------------------------------------------- def retag_all_bills(conn, dry_run: bool = False, verbose: bool = False): """Re-score relevance for every bill already in the DB.""" log.info("Re-tagging all bills…") with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute("SELECT bill_id, title, description, subjects FROM legiscan_bills") rows = cur.fetchall() log.info(f" Scoring {len(rows)} bills…") updates = [] for row in rows: is_rel, tags = score_relevance( row["title"] or "", row["description"] or "", row["subjects"] or [], ) updates.append((is_rel, tags or None, row["bill_id"])) if dry_run: relevant = sum(1 for u in updates if u[0]) log.info(f" [dry-run] Would tag {relevant}/{len(updates)} bills as relevant") return with conn.cursor() as cur: psycopg2.extras.execute_values( cur, "UPDATE legiscan_bills SET is_relevant = data.is_rel, relevance_tags = data.tags " "FROM (VALUES %s) AS data(is_rel, tags, bill_id) " "WHERE legiscan_bills.bill_id = data.bill_id::integer", updates, template="(%s, %s::text[], %s)", ) conn.commit() relevant = sum(1 for u in updates if u[0]) log.info(f" Tagged {relevant}/{len(updates)} bills as relevant.") # --------------------------------------------------------------------------- # Summary # --------------------------------------------------------------------------- def print_summary(conn): queries = { "Total sessions": "SELECT COUNT(*) FROM legiscan_sessions", "Total bills": "SELECT COUNT(*) FROM legiscan_bills", "Relevant bills": "SELECT COUNT(*) FROM legiscan_bills WHERE is_relevant", "States covered": "SELECT COUNT(DISTINCT state) FROM legiscan_bills", } print("\n--- LegiScan ingestion summary ---") with conn.cursor() as cur: for label, sql in queries.items(): cur.execute(sql) print(f" {label}: {cur.fetchone()[0]:,}") # Top relevance tags with conn.cursor() as cur: cur.execute(""" SELECT tag, COUNT(*) AS n FROM legiscan_bills, unnest(relevance_tags) AS tag GROUP BY tag ORDER BY n DESC """) rows = cur.fetchall() if rows: print("\n Relevant bills by tag:") for tag, n in rows: print(f" {tag:<30} {n:>6,}") # Top states for relevant bills with conn.cursor() as cur: cur.execute(""" SELECT state, COUNT(*) AS n FROM legiscan_bills WHERE is_relevant GROUP BY state ORDER BY n DESC LIMIT 15 """) rows = cur.fetchall() if rows: print("\n Top states by relevant bill count:") for state, n in rows: print(f" {state} {n:>5,}") print() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def parse_args(): p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--all", action="store_true", help="Run setup-db + fetch + load + tag") p.add_argument("--setup-db", action="store_true", help="Create/update DB tables") p.add_argument("--fetch", action="store_true", help="Download dataset ZIPs") p.add_argument("--load", action="store_true", help="Load cached ZIPs into DB") p.add_argument("--tag", action="store_true", help="Retag all bills for relevance") p.add_argument("--state", default=None, metavar="XX", help="Limit to one state") p.add_argument("--year-start", type=int, default=MIN_YEAR_DEFAULT, dest="year_start") p.add_argument("--dry-run", action="store_true") p.add_argument("--verbose", action="store_true") return p.parse_args() def main(): args = parse_args() if args.verbose: log.setLevel(logging.DEBUG) if not API_KEY: log.error("LEGISCAN_API_KEY is not set.") sys.exit(1) do_setup = args.all or args.setup_db do_fetch = args.all or args.fetch do_load = args.all or args.load do_tag = args.all or args.tag if not any([do_setup, do_fetch, do_load, do_tag]): log.error("Specify at least one phase: --all, --setup-db, --fetch, --load, --tag") sys.exit(1) conn = None if args.dry_run else get_db_connection() # ── Setup ────────────────────────────────────────────────────────────── if do_setup: if args.dry_run: log.info("[dry-run] Would create legiscan_sessions and legiscan_bills tables.") else: setup_db(conn) # ── Fetch + Load (interleaved per session for memory efficiency) ──────── if do_fetch or do_load: sessions = get_all_dataset_metadata(args.year_start, state_filter=args.state) total = len(sessions) log.info(f"Processing {total} sessions (year_start ≥ {args.year_start})…") total_bills = 0 skipped = 0 for i, session in enumerate(sessions, 1): session_id = session["session_id"] state_id = session["state_id"] year_start = session["year_start"] title = session.get("session_title", "") # Check if import needed if do_load and not args.dry_run and conn and not needs_import(session, conn): log.debug(f" [{i}/{total}] Session {session_id} ({title}) — hash unchanged, skipping.") skipped += 1 continue log.info(f"[{i}/{total}] Session {session_id}: {title}") # Download zip_bytes = None if do_fetch: try: zip_bytes, api_called = download_dataset_zip(session, dry_run=args.dry_run) if api_called: time.sleep(RATE_LIMIT_DELAY) except Exception as e: log.error(f" Download failed for session {session_id}: {e}") continue elif do_load: # Load from cache only cache_path = CACHE_DIR / f"{session_id}_{session['dataset_hash']}.zip" if not cache_path.exists(): log.warning(f" Cache miss for session {session_id} — run --fetch first.") continue zip_bytes = cache_path.read_bytes() # Derive state abbreviation from ZIP structure state_abbr = args.state if zip_bytes and not state_abbr: try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: state_abbr = _state_abbr_from_zip(zf) except Exception: state_abbr = "??" # Upsert session record if do_load and not args.dry_run and conn and state_abbr: upsert_session(session, state_abbr, conn, dry_run=args.dry_run) # Load bills if do_load and zip_bytes: try: n = process_dataset( session, zip_bytes, conn, state_abbr=state_abbr, dry_run=args.dry_run, verbose=args.verbose, ) total_bills += n except Exception as e: log.error(f" Load failed for session {session_id}: {e}") if conn: conn.rollback() log.info(f"Fetch/load complete. Bills processed: {total_bills:,}. Skipped (up-to-date): {skipped}.") # ── Tag ──────────────────────────────────────────────────────────────── if do_tag and not (do_fetch or do_load): if args.dry_run or conn: retag_all_bills(conn, dry_run=args.dry_run, verbose=args.verbose) # ── Summary ──────────────────────────────────────────────────────────── if conn and not args.dry_run: print_summary(conn) conn.close() if __name__ == "__main__": main()