From 4525ea3f9707ec3248f3ca3e6e91b985c3d8afbc Mon Sep 17 00:00:00 2001 From: dadams Date: Wed, 27 May 2026 21:30:31 -0700 Subject: [PATCH] Add LegiScan legislation ingestion and analysis queries Adds ingest_legiscan.py to pull all US state + federal bills (2016-2026) from the LegiScan API into legiscan_sessions and legiscan_bills tables. Bills are keyword-tagged across 8 research categories (data_center, ratepayer_protection, large_load, grid_impact, tax_incentive, etc.). Loads ~1.3M bills; ~60K tagged relevant. Adds query_legiscan_bills.sql with pre-built analysis queries including state/DC joins. Updates database-tables.md, README.md, and research-ideas.md accordingly. Co-Authored-By: Claude Sonnet 4.6 --- README.md | 8 + database-tables.md | 82 ++++- ingest_legiscan.py | 686 +++++++++++++++++++++++++++++++++++++++ query_legiscan_bills.sql | 217 +++++++++++++ research-ideas.md | 54 +++ 5 files changed, 1046 insertions(+), 1 deletion(-) create mode 100644 ingest_legiscan.py create mode 100644 query_legiscan_bills.sql diff --git a/README.md b/README.md index 342fd59..2af2a7c 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ Facilities in DBSCAN clusters differ significantly from isolated sites: - **NOAA HMS Smoke Data**: Wildfire smoke exposure (2005-2025) - **USDM Drought Data**: Drought severity - **Utility Rate Tracker**: State-level electricity rate increases +- **LegiScan Legislative Data**: All US state + federal bills 2016–2026 (1.3M bills, 646 sessions), tagged for data center, ratepayer, grid, tax, and siting topics ## Repository Structure @@ -99,6 +100,7 @@ Facilities in DBSCAN clusters differ significantly from isolated sites: - `load_postgis_internet_cables.py` - Load submarine cables and landing points - `ingest_eia_energy_layers.py` - Ingest EIA energy data via API - `build_watershed_huc8_tables.py` - Load USGS HUC8 watersheds +- `ingest_legiscan.py` - Download all US state/federal bills 2016–2026 via LegiScan API, tag for data center research topics **Enrichment** - `create_data_center_census_tract_table.py` - Join data centers to Census tracts with ACS demographics @@ -151,6 +153,7 @@ Credentials stored in `~/.zsh_secrets`, loaded via environment variables: - `FCC_USERNAME`, `FCC_API_KEY`: FCC broadband data - `RDH_USERNAME`, `RDH_PASSWORD`: Redistricting Data Hub - `CENSUS_API_KEY`: Census ACS API +- `LEGISCAN_API_KEY`: LegiScan legislative data ## Quick Start @@ -174,6 +177,11 @@ python3 analyze_cables_concentration.py > output/cables_analysis.txt # 4. Execute notebooks jupyter notebook cluster_analysis.ipynb + +# 5. Load legislation (all states, 2016-2026) +python3 ingest_legiscan.py --all +# Weekly refresh (skips unchanged sessions): +python3 ingest_legiscan.py --fetch --load ``` ### Generate Maps diff --git a/database-tables.md b/database-tables.md index 89987f2..b9ae28f 100644 --- a/database-tables.md +++ b/database-tables.md @@ -13,11 +13,12 @@ ## Table Organization -Tables are organized into four categories: +Tables are organized into five categories: 1. **Core Data Center Tables** - Master inventories and source data 2. **Enrichment Tables** - Data centers joined with contextual data 3. **Base Layer Tables** - Geographic and demographic reference layers 4. **Infrastructure Tables** - Energy and connectivity infrastructure +5. **Legislation Tables** - LegiScan state and federal bill data (2016-2026) --- @@ -499,6 +500,85 @@ Tables are organized into four categories: --- +--- + +## Legislation Tables + +Populated by `ingest_legiscan.py` using the [LegiScan API](https://legiscan.com/legiscan). +Covers all 50 states + DC + US Congress, sessions from 2016 through 2026. +Data licensed [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/) — attribute LegiScan LLC. + +### `legiscan_sessions` +**Rows**: 646 +**Purpose**: One row per legislative session dataset downloaded from LegiScan + +**Key Columns**: +- `session_id` (INTEGER) - LegiScan session ID (PRIMARY KEY) +- `state_abbr` (VARCHAR) - Two-letter state code (`CA`, `TX`, `US`, etc.) +- `state_id` (INTEGER) - LegiScan numeric state ID +- `year_start`, `year_end` (INTEGER) - Session year range +- `session_title` (TEXT) - Full session name +- `session_tag` (TEXT) - Short tag (e.g., "Regular Session", "1st Special Session") +- `is_special` (BOOLEAN) - True for special/extraordinary sessions +- `is_prior` (BOOLEAN) - True for completed/sine-die sessions +- `dataset_hash` (VARCHAR) - MD5 of dataset ZIP; used to detect updates +- `dataset_date` (DATE) - Date dataset was last published by LegiScan +- `dataset_size_mb` (FLOAT) - Compressed ZIP size +- `bill_count` (INTEGER) - Number of bills loaded from this session +- `imported_at` (TIMESTAMPTZ) - When this session was last imported + +### `legiscan_bills` +**Rows**: ~1,313,000 +**Purpose**: All bills from all sessions; tagged for relevance to data center research topics + +**Key Columns**: +- `bill_id` (INTEGER) - LegiScan bill ID (PRIMARY KEY) +- `session_id` (INTEGER) - FK → `legiscan_sessions` +- `state` (VARCHAR) - Two-letter state code +- `bill_number` (VARCHAR) - Bill number (e.g., `SB 1000`, `HB 233`) +- `bill_type` (VARCHAR) - `B`=Bill, `R`=Resolution, `CR`=Concurrent Resolution, etc. +- `title` (TEXT) - Short title +- `description` (TEXT) - Longer description +- `status` (INTEGER) - Current status code (see below) +- `status_date` (DATE) - Date of last status change +- `completed` (INTEGER) - 1 if bill is in a terminal state +- `body` (VARCHAR) - Originating chamber (`H`=House, `S`=Senate, `C`=Council, etc.) +- `url` (TEXT) - LegiScan bill page URL +- `state_link` (TEXT) - Official state legislature URL +- `change_hash` (VARCHAR) - MD5 used to detect bill-level updates +- `subjects` (TEXT[]) - LegiScan subject tags (GIN indexed) +- `sponsor_count` (INTEGER) - Number of sponsors +- `vote_count` (INTEGER) - Number of recorded votes +- `text_count` (INTEGER) - Number of bill text versions +- `is_relevant` (BOOLEAN) - True if any relevance tag matched (GIN indexed) +- `relevance_tags` (TEXT[]) - Matched topic tags (GIN indexed) +- `imported_at` (TIMESTAMPTZ) - When this bill was last upserted + +**Status codes**: 1=Introduced, 2=Engrossed, 3=Enrolled, 4=Passed, 5=Vetoed, 6=Failed, 7=Override, 8=Chaptered, 9=Referred, 12=Draft + +**Relevance tags** (keyword-matched against title + description + subjects): + +| Tag | What it captures | +|-----|-----------------| +| `data_center` | Data centers, hyperscale, colocation, AI campuses, HPC facilities | +| `large_load` | Crypto mining, large industrial loads, extraordinary load | +| `ratepayer_protection` | Cost shifting, cross-subsidy, rate design, affordability, rate class | +| `grid_impact` | Grid reliability, transmission, interconnection queue, IRP | +| `tax_incentive` | Tax exemptions, abatements, credits for facilities | +| `energy_policy` | Renewable PPAs, green tariffs, clean electricity, decarbonization | +| `water_use` | Cooling water, evaporative cooling, water footprint | +| `siting_permitting` | Zoning, conditional use permits, local control, preemption | + +**Notes**: +- ~60,000 relevant bills out of 1.3M total (~4.6%) +- `data_center` tag: ~2,182 bills; `ratepayer_protection`: ~49,000 +- GIN indexes on `subjects`, `relevance_tags`, and full-text (`title || description`) +- Use `query_legiscan_bills.sql` for pre-built research queries +- Re-run `python ingest_legiscan.py --fetch --load` weekly to pick up dataset updates +- Re-run `python ingest_legiscan.py --tag` after editing keyword lists + +--- + ## Commonly Used Joins ### Data Center to Demographics diff --git a/ingest_legiscan.py b/ingest_legiscan.py new file mode 100644 index 0000000..87ff943 --- /dev/null +++ b/ingest_legiscan.py @@ -0,0 +1,686 @@ +#!/usr/bin/env python3 +""" +Ingest LegiScan legislative datasets for all US states (2016-2026) into PostgreSQL. + +Fetches all state session datasets from the LegiScan API, parses bill JSONs from +each ZIP archive, and loads them into the data_centers PostgreSQL database. Bills are +tagged with relevance categories (data_center, large_load, ratepayer_protection, etc.). + +Usage: + python ingest_legiscan.py [--all | --setup-db | --fetch | --load | --tag] + [--state XX] [--year-start YYYY] [--dry-run] [--verbose] + +Options: + --all Run all phases in sequence + --setup-db Create/update database tables and indexes + --fetch Download dataset ZIPs for all states (uses hash caching) + --load Parse cached ZIPs and insert/update bills in DB + --tag (Re)apply relevance tagging to all loaded bills + --state XX Restrict to one state (e.g., CA) + --year-start N Earliest session year to include (default: 2016) + --dry-run Print what would be done; no API calls or DB writes + --verbose Extra progress output + +Environment: + LEGISCAN_API_KEY Required + PGWEB_HOST, PGWEB_PORT, + PGWEB_USER, PGWEB_PASSWORD PostgreSQL connection (DB: data_centers) +""" + +import argparse +import base64 +import io +import json +import logging +import os +import sys +import time +import zipfile +from datetime import datetime +from pathlib import Path +from typing import Optional + +import psycopg2 +import psycopg2.extras +import requests + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +DB_NAME = "data_centers" +API_KEY = os.environ.get("LEGISCAN_API_KEY") +API_BASE = "https://api.legiscan.com/" +CACHE_DIR = Path("data/legiscan_cache") +MIN_YEAR_DEFAULT = 2016 +RATE_LIMIT_DELAY = 0.5 # seconds between API calls + +# Keyword categories for relevance tagging. +# Keys become the tag values stored in legiscan_bills.relevance_tags[]. +RELEVANCE_KEYWORDS: dict[str, list[str]] = { + "data_center": [ + "data center", "data centre", "hyperscale", "colocation", "colo facility", + "server farm", "cloud computing facility", "internet exchange", + "carrier hotel", "artificial intelligence facility", "ai campus", + "ai data center", "gpu cluster", "compute facility", + "high performance computing", "hpc facility", "data hall", + "network access point", "data warehousing facility", + ], + "large_load": [ + "large load", "large power consumer", "large electricity consumer", + "high electricity consumption", "high power consumption", + "megawatt load", "gigawatt load", "cryptocurrency mining", + "bitcoin mining", "blockchain mining", "crypto mining", + "digital asset mining", "proof of work", "electric arc furnace", + "large industrial customer", "high-density load", "new large load", + "load growth", "extraordinary load", + ], + "ratepayer_protection": [ + "ratepayer", "rate payer", "cost shift", "cost shifting", + "cost allocation", "cross-subsidy", "cross subsidy", + "rate design", "rate structure", "electricity rate", + "electric rate", "utility rate", "rate increase", "rate burden", + "rate base", "stranded cost", "rate class", "customer protection", + "consumer protection", "electric customer", "residential customer", + "demand charge", "transmission cost", "grid upgrade cost", + "interconnection cost", "cost recovery", "rate relief", + "affordability", "energy burden", + ], + "grid_impact": [ + "grid reliability", "grid stability", "grid congestion", + "grid modernization", "grid infrastructure", "electric grid", + "power grid", "electricity grid", "transmission upgrade", + "transmission expansion", "interconnection queue", + "interconnection study", "demand response", "curtailment", + "grid capacity", "system reliability", "capacity expansion", + "electric system", "power system reliability", "grid resilience", + "grid planning", "integrated resource plan", + ], + "water_use": [ + "water consumption", "cooling water", "water efficiency", + "water use effectiveness", "evaporative cooling", + "water withdrawal", "water discharge", "water impact", + "water footprint", "cooling tower", "water-cooled", + "once-through cooling", "recycled water", "water stress", + "water scarcity", + ], + "tax_incentive": [ + "tax credit", "tax exemption", "tax abatement", "tax incentive", + "sales tax exemption", "property tax exemption", "tax break", + "tax relief", "enterprise zone", "economic incentive", + "business incentive", "investment credit", "job creation credit", + "economic development incentive", "opportunity zone", + "tax subsidy", + ], + "energy_policy": [ + "renewable energy", "clean energy", "energy efficiency", + "power purchase agreement", " ppa ", "green tariff", + "clean power", "carbon neutral", "net zero", "decarbonization", + "energy procurement", "24/7 clean energy", "carbon-free", + "clean electricity", "energy storage", "virtual power plant", + "net metering", "green power", + ], + "siting_permitting": [ + "conditional use permit", "special use permit", "land use permit", + "zoning", "facility siting", "environmental review", + "environmental impact", "noise ordinance", "setback requirement", + "building permit", "construction permit", "site approval", + "local approval", "permit requirement", "permitting process", + "local control", "preemption", + ], +} + +# Status code labels (LegiScan) +STATUS_LABELS = { + 0: "N/A", 1: "Introduced", 2: "Engrossed", 3: "Enrolled", + 4: "Passed", 5: "Vetoed", 6: "Failed", 7: "Override", + 8: "Chaptered", 9: "Referred", 10: "Report Pass", + 11: "Report DNP", 12: "Draft", +} + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Database +# --------------------------------------------------------------------------- + +def get_db_connection(): + return psycopg2.connect( + host=os.environ["PGWEB_HOST"], + port=os.environ["PGWEB_PORT"], + user=os.environ["PGWEB_USER"], + password=os.environ["PGWEB_PASSWORD"], + dbname=DB_NAME, + ) + + +DDL = """ +CREATE TABLE IF NOT EXISTS legiscan_sessions ( + session_id INTEGER PRIMARY KEY, + state_id INTEGER NOT NULL, + state_abbr VARCHAR(2) NOT NULL, + year_start INTEGER NOT NULL, + year_end INTEGER NOT NULL, + session_title TEXT, + session_tag TEXT, + is_special BOOLEAN DEFAULT FALSE, + is_prior BOOLEAN DEFAULT FALSE, + dataset_hash VARCHAR(32), + dataset_date DATE, + dataset_size_mb FLOAT, + bill_count INTEGER DEFAULT 0, + imported_at TIMESTAMPTZ +); + +CREATE TABLE IF NOT EXISTS legiscan_bills ( + bill_id INTEGER PRIMARY KEY, + session_id INTEGER REFERENCES legiscan_sessions(session_id), + state VARCHAR(2) NOT NULL, + bill_number VARCHAR(50), + bill_type VARCHAR(10), + title TEXT, + description TEXT, + status INTEGER, + status_date DATE, + completed INTEGER DEFAULT 0, + body VARCHAR(10), + url TEXT, + state_link TEXT, + change_hash VARCHAR(32), + subjects TEXT[], + sponsor_count INTEGER DEFAULT 0, + vote_count INTEGER DEFAULT 0, + text_count INTEGER DEFAULT 0, + is_relevant BOOLEAN DEFAULT FALSE, + relevance_tags TEXT[], + imported_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ls_bills_state ON legiscan_bills(state); +CREATE INDEX IF NOT EXISTS idx_ls_bills_session ON legiscan_bills(session_id); +CREATE INDEX IF NOT EXISTS idx_ls_bills_status ON legiscan_bills(status); +CREATE INDEX IF NOT EXISTS idx_ls_bills_relevant ON legiscan_bills(is_relevant) WHERE is_relevant; +CREATE INDEX IF NOT EXISTS idx_ls_bills_subjects ON legiscan_bills USING gin(subjects); +CREATE INDEX IF NOT EXISTS idx_ls_bills_rtags ON legiscan_bills USING gin(relevance_tags); +CREATE INDEX IF NOT EXISTS idx_ls_bills_fts ON legiscan_bills + USING gin(to_tsvector('english', + COALESCE(title, '') || ' ' || COALESCE(description, ''))); +""" + + +def setup_db(conn): + with conn.cursor() as cur: + cur.execute(DDL) + conn.commit() + log.info("Database tables and indexes ready.") + + +# --------------------------------------------------------------------------- +# LegiScan API helpers +# --------------------------------------------------------------------------- + +def _api_get(params: dict, timeout: int = 120) -> dict: + """Make one LegiScan API call and return the parsed JSON.""" + params["key"] = API_KEY + resp = requests.get(API_BASE, params=params, timeout=timeout) + resp.raise_for_status() + data = resp.json() + if data.get("status") != "OK": + raise RuntimeError(f"LegiScan API error: {data}") + return data + + +def get_all_dataset_metadata(year_start: int, state_filter: Optional[str] = None) -> list[dict]: + """Fetch full dataset list (one API call), filter to year_start+.""" + log.info("Fetching dataset list from LegiScan…") + data = _api_get({"op": "getDatasetList"}) + sessions = data["datasetlist"] + log.info(f" Total sessions across all states: {len(sessions)}") + sessions = [s for s in sessions if s["year_start"] >= year_start] + if state_filter: + # Need to map state abbr → state_id. Derive from a quick per-state call. + log.info(f" Filtering to state {state_filter}…") + state_data = _api_get({"op": "getDatasetList", "state": state_filter}) + valid_ids = {s["session_id"] for s in state_data["datasetlist"]} + sessions = [s for s in sessions if s["session_id"] in valid_ids] + log.info(f" Sessions matching filters: {len(sessions)}") + return sessions + + +def download_dataset_zip(session: dict, dry_run: bool = False) -> tuple[Optional[bytes], bool]: + """Download a dataset ZIP via the API; cache to disk. + Returns (zip_bytes, api_call_made) — api_call_made is True only when the + network was actually hit so the caller can rate-limit appropriately.""" + session_id = session["session_id"] + dataset_hash = session["dataset_hash"] + access_key = session["access_key"] + + CACHE_DIR.mkdir(parents=True, exist_ok=True) + cache_path = CACHE_DIR / f"{session_id}_{dataset_hash}.zip" + + if cache_path.exists(): + log.debug(f" Cache hit: {cache_path.name}") + return cache_path.read_bytes(), False + + if dry_run: + log.info(f" [dry-run] Would download session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)") + return None, False + + log.info(f" Downloading session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)…") + data = _api_get({"op": "getDataset", "access_key": access_key, "id": session_id}) + zip_bytes = base64.b64decode(data["dataset"]["zip"]) + cache_path.write_bytes(zip_bytes) + log.info(f" Cached → {cache_path.name}") + return zip_bytes, True + return zip_bytes + + +# --------------------------------------------------------------------------- +# Relevance tagging +# --------------------------------------------------------------------------- + +def score_relevance(title: str, description: str, subjects: list[str]) -> tuple[bool, list[str]]: + """Return (is_relevant, list_of_matched_tags).""" + haystack = " ".join([ + (title or "").lower(), + (description or "").lower(), + " ".join(s.lower() for s in subjects), + ]) + tags = [] + for tag, keywords in RELEVANCE_KEYWORDS.items(): + if any(kw in haystack for kw in keywords): + tags.append(tag) + return bool(tags), tags + + +# --------------------------------------------------------------------------- +# ZIP processing and DB loading +# --------------------------------------------------------------------------- + +def _state_abbr_from_zip(zf: zipfile.ZipFile) -> str: + """Extract the state abbreviation from the ZIP's path structure.""" + for name in zf.namelist(): + parts = name.split("/") + if len(parts) >= 1 and len(parts[0]) == 2: + return parts[0] + return "??" + + +def process_dataset( + session: dict, + zip_bytes: bytes, + conn, + state_abbr: Optional[str] = None, + dry_run: bool = False, + verbose: bool = False, +) -> int: + """Parse all bill JSONs from a ZIP and upsert into legiscan_bills. Returns count.""" + session_id = session["session_id"] + + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + if not state_abbr: + state_abbr = _state_abbr_from_zip(zf) + bill_files = [n for n in zf.namelist() if "/bill/" in n and n.endswith(".json")] + + if not bill_files: + log.warning(f" Session {session_id}: no bill files found in ZIP.") + return 0 + + rows = [] + for fname in bill_files: + try: + raw = json.loads(zf.read(fname)) + b = raw.get("bill", raw) + except Exception as e: + log.warning(f" Could not parse {fname}: {e}") + continue + + subjects = [s["subject_name"] for s in (b.get("subjects") or []) if s.get("subject_name")] + is_rel, tags = score_relevance( + b.get("title", ""), + b.get("description", ""), + subjects, + ) + + status_date = b.get("status_date") or None + rows.append(( + b["bill_id"], + session_id, + b.get("state", state_abbr), + b.get("bill_number"), + b.get("bill_type"), + b.get("title"), + b.get("description"), + b.get("status"), + status_date, + b.get("completed", 0), + b.get("body"), + b.get("url"), + b.get("state_link"), + b.get("change_hash"), + subjects or None, + len(b.get("sponsors") or []), + len(b.get("votes") or []), + len(b.get("texts") or []), + is_rel, + tags or None, + )) + + if dry_run: + log.info(f" [dry-run] Session {session_id} ({state_abbr}): would insert/update {len(rows)} bills") + return len(rows) + + UPSERT = """ + INSERT INTO legiscan_bills ( + bill_id, session_id, state, bill_number, bill_type, + title, description, status, status_date, completed, + body, url, state_link, change_hash, subjects, + sponsor_count, vote_count, text_count, + is_relevant, relevance_tags, imported_at + ) VALUES %s + ON CONFLICT (bill_id) DO UPDATE SET + change_hash = EXCLUDED.change_hash, + status = EXCLUDED.status, + status_date = EXCLUDED.status_date, + completed = EXCLUDED.completed, + subjects = EXCLUDED.subjects, + sponsor_count = EXCLUDED.sponsor_count, + vote_count = EXCLUDED.vote_count, + text_count = EXCLUDED.text_count, + is_relevant = EXCLUDED.is_relevant, + relevance_tags = EXCLUDED.relevance_tags, + imported_at = NOW() + WHERE legiscan_bills.change_hash IS DISTINCT FROM EXCLUDED.change_hash + """ + + template = "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())" + + with conn.cursor() as cur: + psycopg2.extras.execute_values(cur, UPSERT, rows, template=template, page_size=500) + count = cur.rowcount + + # Update session bill_count + with conn.cursor() as cur: + cur.execute( + "UPDATE legiscan_sessions SET bill_count = %s, imported_at = NOW() WHERE session_id = %s", + (len(rows), session_id), + ) + conn.commit() + + if verbose: + relevant = sum(1 for r in rows if r[18]) + log.info(f" Session {session_id} ({state_abbr}): {len(rows)} bills, {relevant} relevant, {count} upserted") + return len(rows) + + +def upsert_session(session: dict, state_abbr: str, conn, dry_run: bool = False): + """Insert or update a session record.""" + if dry_run: + return + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO legiscan_sessions + (session_id, state_id, state_abbr, year_start, year_end, + session_title, session_tag, is_special, is_prior, + dataset_hash, dataset_date, dataset_size_mb) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (session_id) DO UPDATE SET + dataset_hash = EXCLUDED.dataset_hash, + dataset_date = EXCLUDED.dataset_date, + dataset_size_mb = EXCLUDED.dataset_size_mb, + session_title = EXCLUDED.session_title + """, ( + session["session_id"], + session["state_id"], + state_abbr, + session["year_start"], + session["year_end"], + session.get("session_title"), + session.get("session_tag"), + bool(session.get("special")), + bool(session.get("prior")), + session.get("dataset_hash"), + session.get("dataset_date"), + session.get("dataset_size", 0) / 1e6, + )) + conn.commit() + + +def needs_import(session: dict, conn) -> bool: + """Return True if this session's dataset_hash differs from what's in the DB.""" + with conn.cursor() as cur: + cur.execute( + "SELECT dataset_hash FROM legiscan_sessions WHERE session_id = %s", + (session["session_id"],), + ) + row = cur.fetchone() + if row is None: + return True + return row[0] != session["dataset_hash"] + + +# --------------------------------------------------------------------------- +# Retag phase +# --------------------------------------------------------------------------- + +def retag_all_bills(conn, dry_run: bool = False, verbose: bool = False): + """Re-score relevance for every bill already in the DB.""" + log.info("Re-tagging all bills…") + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("SELECT bill_id, title, description, subjects FROM legiscan_bills") + rows = cur.fetchall() + + log.info(f" Scoring {len(rows)} bills…") + updates = [] + for row in rows: + is_rel, tags = score_relevance( + row["title"] or "", + row["description"] or "", + row["subjects"] or [], + ) + updates.append((is_rel, tags or None, row["bill_id"])) + + if dry_run: + relevant = sum(1 for u in updates if u[0]) + log.info(f" [dry-run] Would tag {relevant}/{len(updates)} bills as relevant") + return + + with conn.cursor() as cur: + psycopg2.extras.execute_values( + cur, + "UPDATE legiscan_bills SET is_relevant = data.is_rel, relevance_tags = data.tags " + "FROM (VALUES %s) AS data(is_rel, tags, bill_id) " + "WHERE legiscan_bills.bill_id = data.bill_id::integer", + updates, + template="(%s, %s::text[], %s)", + ) + conn.commit() + + relevant = sum(1 for u in updates if u[0]) + log.info(f" Tagged {relevant}/{len(updates)} bills as relevant.") + + +# --------------------------------------------------------------------------- +# Summary +# --------------------------------------------------------------------------- + +def print_summary(conn): + queries = { + "Total sessions": "SELECT COUNT(*) FROM legiscan_sessions", + "Total bills": "SELECT COUNT(*) FROM legiscan_bills", + "Relevant bills": "SELECT COUNT(*) FROM legiscan_bills WHERE is_relevant", + "States covered": "SELECT COUNT(DISTINCT state) FROM legiscan_bills", + } + print("\n--- LegiScan ingestion summary ---") + with conn.cursor() as cur: + for label, sql in queries.items(): + cur.execute(sql) + print(f" {label}: {cur.fetchone()[0]:,}") + + # Top relevance tags + with conn.cursor() as cur: + cur.execute(""" + SELECT tag, COUNT(*) AS n + FROM legiscan_bills, unnest(relevance_tags) AS tag + GROUP BY tag ORDER BY n DESC + """) + rows = cur.fetchall() + if rows: + print("\n Relevant bills by tag:") + for tag, n in rows: + print(f" {tag:<30} {n:>6,}") + + # Top states for relevant bills + with conn.cursor() as cur: + cur.execute(""" + SELECT state, COUNT(*) AS n + FROM legiscan_bills WHERE is_relevant + GROUP BY state ORDER BY n DESC LIMIT 15 + """) + rows = cur.fetchall() + if rows: + print("\n Top states by relevant bill count:") + for state, n in rows: + print(f" {state} {n:>5,}") + print() + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def parse_args(): + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--all", action="store_true", help="Run setup-db + fetch + load + tag") + p.add_argument("--setup-db", action="store_true", help="Create/update DB tables") + p.add_argument("--fetch", action="store_true", help="Download dataset ZIPs") + p.add_argument("--load", action="store_true", help="Load cached ZIPs into DB") + p.add_argument("--tag", action="store_true", help="Retag all bills for relevance") + p.add_argument("--state", default=None, metavar="XX", help="Limit to one state") + p.add_argument("--year-start", type=int, default=MIN_YEAR_DEFAULT, dest="year_start") + p.add_argument("--dry-run", action="store_true") + p.add_argument("--verbose", action="store_true") + return p.parse_args() + + +def main(): + args = parse_args() + if args.verbose: + log.setLevel(logging.DEBUG) + + if not API_KEY: + log.error("LEGISCAN_API_KEY is not set.") + sys.exit(1) + + do_setup = args.all or args.setup_db + do_fetch = args.all or args.fetch + do_load = args.all or args.load + do_tag = args.all or args.tag + + if not any([do_setup, do_fetch, do_load, do_tag]): + log.error("Specify at least one phase: --all, --setup-db, --fetch, --load, --tag") + sys.exit(1) + + conn = None if args.dry_run else get_db_connection() + + # ── Setup ────────────────────────────────────────────────────────────── + if do_setup: + if args.dry_run: + log.info("[dry-run] Would create legiscan_sessions and legiscan_bills tables.") + else: + setup_db(conn) + + # ── Fetch + Load (interleaved per session for memory efficiency) ──────── + if do_fetch or do_load: + sessions = get_all_dataset_metadata(args.year_start, state_filter=args.state) + total = len(sessions) + log.info(f"Processing {total} sessions (year_start ≥ {args.year_start})…") + + total_bills = 0 + skipped = 0 + + for i, session in enumerate(sessions, 1): + session_id = session["session_id"] + state_id = session["state_id"] + year_start = session["year_start"] + title = session.get("session_title", "") + + # Check if import needed + if do_load and not args.dry_run and conn and not needs_import(session, conn): + log.debug(f" [{i}/{total}] Session {session_id} ({title}) — hash unchanged, skipping.") + skipped += 1 + continue + + log.info(f"[{i}/{total}] Session {session_id}: {title}") + + # Download + zip_bytes = None + if do_fetch: + try: + zip_bytes, api_called = download_dataset_zip(session, dry_run=args.dry_run) + if api_called: + time.sleep(RATE_LIMIT_DELAY) + except Exception as e: + log.error(f" Download failed for session {session_id}: {e}") + continue + elif do_load: + # Load from cache only + cache_path = CACHE_DIR / f"{session_id}_{session['dataset_hash']}.zip" + if not cache_path.exists(): + log.warning(f" Cache miss for session {session_id} — run --fetch first.") + continue + zip_bytes = cache_path.read_bytes() + + # Derive state abbreviation from ZIP structure + state_abbr = args.state + if zip_bytes and not state_abbr: + try: + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + state_abbr = _state_abbr_from_zip(zf) + except Exception: + state_abbr = "??" + + # Upsert session record + if do_load and not args.dry_run and conn and state_abbr: + upsert_session(session, state_abbr, conn, dry_run=args.dry_run) + + # Load bills + if do_load and zip_bytes: + try: + n = process_dataset( + session, zip_bytes, conn, + state_abbr=state_abbr, + dry_run=args.dry_run, + verbose=args.verbose, + ) + total_bills += n + except Exception as e: + log.error(f" Load failed for session {session_id}: {e}") + if conn: + conn.rollback() + + log.info(f"Fetch/load complete. Bills processed: {total_bills:,}. Skipped (up-to-date): {skipped}.") + + # ── Tag ──────────────────────────────────────────────────────────────── + if do_tag and not (do_fetch or do_load): + if args.dry_run or conn: + retag_all_bills(conn, dry_run=args.dry_run, verbose=args.verbose) + + # ── Summary ──────────────────────────────────────────────────────────── + if conn and not args.dry_run: + print_summary(conn) + conn.close() + + +if __name__ == "__main__": + main() diff --git a/query_legiscan_bills.sql b/query_legiscan_bills.sql new file mode 100644 index 0000000..56ca99a --- /dev/null +++ b/query_legiscan_bills.sql @@ -0,0 +1,217 @@ +-- ============================================================ +-- LegiScan Legislative Analysis Queries +-- Database: data_centers Schema: public +-- ============================================================ +-- +-- SETUP +-- Populate the database first: +-- python ingest_legiscan.py --all +-- This downloads ~646 sessions (2016-2026, all US states + federal), +-- loads ~1.3M bills, and tags ~60K as relevant. +-- +-- To refresh (weekly dataset updates from LegiScan): +-- python ingest_legiscan.py --fetch --load +-- Already-imported sessions with unchanged dataset_hash are skipped. +-- +-- To retag after editing keyword lists in ingest_legiscan.py: +-- python ingest_legiscan.py --tag +-- +-- RELEVANCE TAGS (stored in legiscan_bills.relevance_tags[]): +-- data_center - Bills naming data centers, hyperscale, colocation, AI campuses +-- large_load - Crypto mining, large industrial loads, extraordinary load +-- ratepayer_protection- Cost shifting, cross-subsidy, rate design, affordability +-- grid_impact - Grid reliability, transmission, interconnection queue +-- tax_incentive - Tax exemptions/abatements/credits for facilities +-- energy_policy - Renewable PPAs, green tariffs, clean electricity +-- water_use - Cooling water, evaporative cooling, water footprint +-- siting_permitting - Zoning, conditional use permits, local control +-- +-- STATUS CODES (legiscan_bills.status): +-- 1=Introduced 2=Engrossed 3=Enrolled 4=Passed 5=Vetoed +-- 6=Failed 7=Override 8=Chaptered 9=Referred 12=Draft +-- ============================================================ + +-- ── Quick overview ────────────────────────────────────────── + +SELECT + COUNT(*) AS total_bills, + COUNT(*) FILTER (WHERE is_relevant) AS relevant_bills, + COUNT(DISTINCT state) AS states, + MIN(ls.year_start) AS year_from, + MAX(ls.year_end) AS year_to +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id); + +-- ── Bills per relevance tag ───────────────────────────────── + +SELECT + tag, + COUNT(*) AS bill_count, + COUNT(*) FILTER (WHERE lb.status = 4) AS passed, + COUNT(*) FILTER (WHERE lb.status IN (4,8)) AS enacted +FROM legiscan_bills lb, unnest(relevance_tags) AS tag +GROUP BY tag +ORDER BY bill_count DESC; + +-- ── Top states for relevant legislation ──────────────────── + +SELECT + state, + COUNT(*) AS relevant_bills, + COUNT(*) FILTER (WHERE 'data_center' = ANY(relevance_tags)) AS data_center, + COUNT(*) FILTER (WHERE 'large_load' = ANY(relevance_tags)) AS large_load, + COUNT(*) FILTER (WHERE 'ratepayer_protection' = ANY(relevance_tags)) AS ratepayer, + COUNT(*) FILTER (WHERE 'tax_incentive' = ANY(relevance_tags)) AS tax_incentive, + COUNT(*) FILTER (WHERE 'grid_impact' = ANY(relevance_tags)) AS grid_impact +FROM legiscan_bills +WHERE is_relevant +GROUP BY state +ORDER BY relevant_bills DESC +LIMIT 20; + +-- ── Trend by year ─────────────────────────────────────────── + +SELECT + ls.year_start AS year, + COUNT(lb.bill_id) AS total_bills, + COUNT(lb.bill_id) FILTER (WHERE lb.is_relevant) AS relevant_bills, + COUNT(lb.bill_id) FILTER (WHERE lb.is_relevant AND lb.status IN (4,8)) AS enacted, + ROUND(100.0 * COUNT(lb.bill_id) FILTER (WHERE lb.is_relevant) + / NULLIF(COUNT(lb.bill_id), 0), 1) AS pct_relevant +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +GROUP BY ls.year_start +ORDER BY ls.year_start; + +-- ── Data center bills specifically ───────────────────────── + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.relevance_tags, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE 'data_center' = ANY(lb.relevance_tags) +ORDER BY + CASE lb.status WHEN 4 THEN 0 WHEN 8 THEN 1 WHEN 3 THEN 2 ELSE 3 END, + ls.year_start DESC, + lb.state; + +-- ── Ratepayer protection bills ────────────────────────────── + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.relevance_tags, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE 'ratepayer_protection' = ANY(lb.relevance_tags) +ORDER BY + CASE lb.status WHEN 4 THEN 0 WHEN 8 THEN 1 WHEN 3 THEN 2 ELSE 3 END, + ls.year_start DESC, + lb.state; + +-- ── Bills at intersection of data center + ratepayer ─────── + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.relevance_tags, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE 'data_center' = ANY(lb.relevance_tags) + AND 'ratepayer_protection' = ANY(lb.relevance_tags) +ORDER BY ls.year_start DESC, lb.state; + +-- ── Large load + grid impact ──────────────────────────────── + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.relevance_tags, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE 'large_load' = ANY(lb.relevance_tags) + AND 'grid_impact' = ANY(lb.relevance_tags) +ORDER BY ls.year_start DESC, lb.state; + +-- ── Tax incentive bills passed/enacted ───────────────────── + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE 'tax_incentive' = ANY(lb.relevance_tags) + AND lb.status IN (4, 8) -- Passed or Chaptered +ORDER BY ls.year_start DESC, lb.state; + +-- ── Join to data centers: states with both DCs and active legislation ── + +SELECT + dc.state, + COUNT(DISTINCT dc.id) AS data_centers, + COUNT(DISTINCT lb.bill_id) AS relevant_bills, + COUNT(DISTINCT lb.bill_id) + FILTER (WHERE 'ratepayer_protection' = ANY(lb.relevance_tags)) AS ratepayer_bills, + COUNT(DISTINCT lb.bill_id) + FILTER (WHERE 'data_center' = ANY(lb.relevance_tags)) AS dc_specific_bills, + COUNT(DISTINCT lb.bill_id) + FILTER (WHERE lb.status IN (4,8)) AS enacted_bills +FROM master_data_centers dc +LEFT JOIN legiscan_bills lb ON dc.state = lb.state AND lb.is_relevant +GROUP BY dc.state +ORDER BY relevant_bills DESC; + +-- ── Full-text search: find bills mentioning specific terms ── +-- Replace 'hyperscale' with any keyword of interest + +SELECT + lb.state, + lb.bill_number, + ls.year_start AS year, + lb.status, + lb.title, + lb.description, + lb.url +FROM legiscan_bills lb +JOIN legiscan_sessions ls USING (session_id) +WHERE to_tsvector('english', COALESCE(lb.title,'') || ' ' || COALESCE(lb.description,'')) + @@ to_tsquery('english', 'hyperscale | colocation | "data center"') +ORDER BY ts_rank( + to_tsvector('english', COALESCE(lb.title,'') || ' ' || COALESCE(lb.description,'')), + to_tsquery('english', 'hyperscale | colocation | "data center"') +) DESC +LIMIT 50; + +-- ── Session coverage check ────────────────────────────────── + +SELECT + state_abbr, + COUNT(*) AS sessions_loaded, + SUM(bill_count) AS total_bills, + MIN(year_start) AS earliest, + MAX(year_end) AS latest +FROM legiscan_sessions +GROUP BY state_abbr +ORDER BY state_abbr; diff --git a/research-ideas.md b/research-ideas.md index 8e368aa..ff697d8 100644 --- a/research-ideas.md +++ b/research-ideas.md @@ -566,4 +566,58 @@ If you're interested in collaborating on any of these research directions, pleas --- +## Legislative Analysis (LegiScan Data) + +**Status**: Data loaded — 1.3M bills across all US states + federal, 2016–2026; ~60K tagged relevant. +**Tables**: `legiscan_sessions`, `legiscan_bills` +**Query file**: `query_legiscan_bills.sql` + +### Research Questions + +**1. Ratepayer Cost Shifting** +Do states with high data center density show more legislative activity on ratepayer protection? +- Join `legiscan_bills WHERE 'ratepayer_protection' = ANY(relevance_tags)` to `master_data_centers` counts by state +- Test correlation between DC concentration and # of ratepayer bills introduced/passed +- Compare outcomes: do high-DC states pass or fail more ratepayer protections? + +**2. Data Center Legislative Wave** +Is there a measurable increase in DC-specific legislation after 2022 (AI boom)? +- Trend `data_center` and `large_load` tagged bills by `year_start` +- Cross-reference with major AI facility announcements (2022–2025) + +**3. Tax Incentive Geography** +Which states enacted tax incentives that may have influenced DC location decisions? +- `tax_incentive` bills with `status IN (4,8)` (passed/chaptered) +- Overlay with `master_data_centers` growth by state over the same period +- Candidate for difference-in-differences analysis + +**4. Grid Interconnection Policy** +Do states with `grid_impact` legislation show different EIA capacity expansion patterns? +- Join relevant bills to `energy_eia_operating_generator_capacity_flat` by state +- Look for correlations between grid policy activity and nameplate MW additions + +**5. Siting Preemption vs. Local Control** +Are states passing bills to streamline or restrict local siting authority? +- Full-text search within `siting_permitting` bills for "preemption" vs. "local control" +- Map bill outcomes by state political environment (cross-ref RDH vote data) + +### Suggested Joins + +```sql +-- States with DCs and legislative activity by topic +SELECT + dc.state, + COUNT(DISTINCT dc.id) AS data_centers, + COUNT(DISTINCT lb.bill_id) FILTER (WHERE 'data_center' = ANY(relevance_tags)) AS dc_bills, + COUNT(DISTINCT lb.bill_id) FILTER (WHERE 'ratepayer_protection'= ANY(relevance_tags)) AS ratepayer_bills, + COUNT(DISTINCT lb.bill_id) FILTER (WHERE 'tax_incentive' = ANY(relevance_tags) + AND lb.status IN (4,8)) AS tax_incentives_passed +FROM master_data_centers dc +LEFT JOIN legiscan_bills lb ON dc.state = lb.state AND lb.is_relevant +GROUP BY dc.state +ORDER BY data_centers DESC; +``` + +--- + **Last Updated**: May 2026