Adds ingest_legiscan.py to pull all US state + federal bills (2016-2026) from the LegiScan API into legiscan_sessions and legiscan_bills tables. Bills are keyword-tagged across 8 research categories (data_center, ratepayer_protection, large_load, grid_impact, tax_incentive, etc.). Loads ~1.3M bills; ~60K tagged relevant. Adds query_legiscan_bills.sql with pre-built analysis queries including state/DC joins. Updates database-tables.md, README.md, and research-ideas.md accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
687 lines
27 KiB
Python
687 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ingest LegiScan legislative datasets for all US states (2016-2026) into PostgreSQL.
|
|
|
|
Fetches all state session datasets from the LegiScan API, parses bill JSONs from
|
|
each ZIP archive, and loads them into the data_centers PostgreSQL database. Bills are
|
|
tagged with relevance categories (data_center, large_load, ratepayer_protection, etc.).
|
|
|
|
Usage:
|
|
python ingest_legiscan.py [--all | --setup-db | --fetch | --load | --tag]
|
|
[--state XX] [--year-start YYYY] [--dry-run] [--verbose]
|
|
|
|
Options:
|
|
--all Run all phases in sequence
|
|
--setup-db Create/update database tables and indexes
|
|
--fetch Download dataset ZIPs for all states (uses hash caching)
|
|
--load Parse cached ZIPs and insert/update bills in DB
|
|
--tag (Re)apply relevance tagging to all loaded bills
|
|
--state XX Restrict to one state (e.g., CA)
|
|
--year-start N Earliest session year to include (default: 2016)
|
|
--dry-run Print what would be done; no API calls or DB writes
|
|
--verbose Extra progress output
|
|
|
|
Environment:
|
|
LEGISCAN_API_KEY Required
|
|
PGWEB_HOST, PGWEB_PORT,
|
|
PGWEB_USER, PGWEB_PASSWORD PostgreSQL connection (DB: data_centers)
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DB_NAME = "data_centers"
|
|
API_KEY = os.environ.get("LEGISCAN_API_KEY")
|
|
API_BASE = "https://api.legiscan.com/"
|
|
CACHE_DIR = Path("data/legiscan_cache")
|
|
MIN_YEAR_DEFAULT = 2016
|
|
RATE_LIMIT_DELAY = 0.5 # seconds between API calls
|
|
|
|
# Keyword categories for relevance tagging.
|
|
# Keys become the tag values stored in legiscan_bills.relevance_tags[].
|
|
RELEVANCE_KEYWORDS: dict[str, list[str]] = {
|
|
"data_center": [
|
|
"data center", "data centre", "hyperscale", "colocation", "colo facility",
|
|
"server farm", "cloud computing facility", "internet exchange",
|
|
"carrier hotel", "artificial intelligence facility", "ai campus",
|
|
"ai data center", "gpu cluster", "compute facility",
|
|
"high performance computing", "hpc facility", "data hall",
|
|
"network access point", "data warehousing facility",
|
|
],
|
|
"large_load": [
|
|
"large load", "large power consumer", "large electricity consumer",
|
|
"high electricity consumption", "high power consumption",
|
|
"megawatt load", "gigawatt load", "cryptocurrency mining",
|
|
"bitcoin mining", "blockchain mining", "crypto mining",
|
|
"digital asset mining", "proof of work", "electric arc furnace",
|
|
"large industrial customer", "high-density load", "new large load",
|
|
"load growth", "extraordinary load",
|
|
],
|
|
"ratepayer_protection": [
|
|
"ratepayer", "rate payer", "cost shift", "cost shifting",
|
|
"cost allocation", "cross-subsidy", "cross subsidy",
|
|
"rate design", "rate structure", "electricity rate",
|
|
"electric rate", "utility rate", "rate increase", "rate burden",
|
|
"rate base", "stranded cost", "rate class", "customer protection",
|
|
"consumer protection", "electric customer", "residential customer",
|
|
"demand charge", "transmission cost", "grid upgrade cost",
|
|
"interconnection cost", "cost recovery", "rate relief",
|
|
"affordability", "energy burden",
|
|
],
|
|
"grid_impact": [
|
|
"grid reliability", "grid stability", "grid congestion",
|
|
"grid modernization", "grid infrastructure", "electric grid",
|
|
"power grid", "electricity grid", "transmission upgrade",
|
|
"transmission expansion", "interconnection queue",
|
|
"interconnection study", "demand response", "curtailment",
|
|
"grid capacity", "system reliability", "capacity expansion",
|
|
"electric system", "power system reliability", "grid resilience",
|
|
"grid planning", "integrated resource plan",
|
|
],
|
|
"water_use": [
|
|
"water consumption", "cooling water", "water efficiency",
|
|
"water use effectiveness", "evaporative cooling",
|
|
"water withdrawal", "water discharge", "water impact",
|
|
"water footprint", "cooling tower", "water-cooled",
|
|
"once-through cooling", "recycled water", "water stress",
|
|
"water scarcity",
|
|
],
|
|
"tax_incentive": [
|
|
"tax credit", "tax exemption", "tax abatement", "tax incentive",
|
|
"sales tax exemption", "property tax exemption", "tax break",
|
|
"tax relief", "enterprise zone", "economic incentive",
|
|
"business incentive", "investment credit", "job creation credit",
|
|
"economic development incentive", "opportunity zone",
|
|
"tax subsidy",
|
|
],
|
|
"energy_policy": [
|
|
"renewable energy", "clean energy", "energy efficiency",
|
|
"power purchase agreement", " ppa ", "green tariff",
|
|
"clean power", "carbon neutral", "net zero", "decarbonization",
|
|
"energy procurement", "24/7 clean energy", "carbon-free",
|
|
"clean electricity", "energy storage", "virtual power plant",
|
|
"net metering", "green power",
|
|
],
|
|
"siting_permitting": [
|
|
"conditional use permit", "special use permit", "land use permit",
|
|
"zoning", "facility siting", "environmental review",
|
|
"environmental impact", "noise ordinance", "setback requirement",
|
|
"building permit", "construction permit", "site approval",
|
|
"local approval", "permit requirement", "permitting process",
|
|
"local control", "preemption",
|
|
],
|
|
}
|
|
|
|
# Status code labels (LegiScan)
|
|
STATUS_LABELS = {
|
|
0: "N/A", 1: "Introduced", 2: "Engrossed", 3: "Enrolled",
|
|
4: "Passed", 5: "Vetoed", 6: "Failed", 7: "Override",
|
|
8: "Chaptered", 9: "Referred", 10: "Report Pass",
|
|
11: "Report DNP", 12: "Draft",
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_db_connection():
|
|
return psycopg2.connect(
|
|
host=os.environ["PGWEB_HOST"],
|
|
port=os.environ["PGWEB_PORT"],
|
|
user=os.environ["PGWEB_USER"],
|
|
password=os.environ["PGWEB_PASSWORD"],
|
|
dbname=DB_NAME,
|
|
)
|
|
|
|
|
|
DDL = """
|
|
CREATE TABLE IF NOT EXISTS legiscan_sessions (
|
|
session_id INTEGER PRIMARY KEY,
|
|
state_id INTEGER NOT NULL,
|
|
state_abbr VARCHAR(2) NOT NULL,
|
|
year_start INTEGER NOT NULL,
|
|
year_end INTEGER NOT NULL,
|
|
session_title TEXT,
|
|
session_tag TEXT,
|
|
is_special BOOLEAN DEFAULT FALSE,
|
|
is_prior BOOLEAN DEFAULT FALSE,
|
|
dataset_hash VARCHAR(32),
|
|
dataset_date DATE,
|
|
dataset_size_mb FLOAT,
|
|
bill_count INTEGER DEFAULT 0,
|
|
imported_at TIMESTAMPTZ
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS legiscan_bills (
|
|
bill_id INTEGER PRIMARY KEY,
|
|
session_id INTEGER REFERENCES legiscan_sessions(session_id),
|
|
state VARCHAR(2) NOT NULL,
|
|
bill_number VARCHAR(50),
|
|
bill_type VARCHAR(10),
|
|
title TEXT,
|
|
description TEXT,
|
|
status INTEGER,
|
|
status_date DATE,
|
|
completed INTEGER DEFAULT 0,
|
|
body VARCHAR(10),
|
|
url TEXT,
|
|
state_link TEXT,
|
|
change_hash VARCHAR(32),
|
|
subjects TEXT[],
|
|
sponsor_count INTEGER DEFAULT 0,
|
|
vote_count INTEGER DEFAULT 0,
|
|
text_count INTEGER DEFAULT 0,
|
|
is_relevant BOOLEAN DEFAULT FALSE,
|
|
relevance_tags TEXT[],
|
|
imported_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_state ON legiscan_bills(state);
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_session ON legiscan_bills(session_id);
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_status ON legiscan_bills(status);
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_relevant ON legiscan_bills(is_relevant) WHERE is_relevant;
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_subjects ON legiscan_bills USING gin(subjects);
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_rtags ON legiscan_bills USING gin(relevance_tags);
|
|
CREATE INDEX IF NOT EXISTS idx_ls_bills_fts ON legiscan_bills
|
|
USING gin(to_tsvector('english',
|
|
COALESCE(title, '') || ' ' || COALESCE(description, '')));
|
|
"""
|
|
|
|
|
|
def setup_db(conn):
|
|
with conn.cursor() as cur:
|
|
cur.execute(DDL)
|
|
conn.commit()
|
|
log.info("Database tables and indexes ready.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LegiScan API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _api_get(params: dict, timeout: int = 120) -> dict:
|
|
"""Make one LegiScan API call and return the parsed JSON."""
|
|
params["key"] = API_KEY
|
|
resp = requests.get(API_BASE, params=params, timeout=timeout)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("status") != "OK":
|
|
raise RuntimeError(f"LegiScan API error: {data}")
|
|
return data
|
|
|
|
|
|
def get_all_dataset_metadata(year_start: int, state_filter: Optional[str] = None) -> list[dict]:
|
|
"""Fetch full dataset list (one API call), filter to year_start+."""
|
|
log.info("Fetching dataset list from LegiScan…")
|
|
data = _api_get({"op": "getDatasetList"})
|
|
sessions = data["datasetlist"]
|
|
log.info(f" Total sessions across all states: {len(sessions)}")
|
|
sessions = [s for s in sessions if s["year_start"] >= year_start]
|
|
if state_filter:
|
|
# Need to map state abbr → state_id. Derive from a quick per-state call.
|
|
log.info(f" Filtering to state {state_filter}…")
|
|
state_data = _api_get({"op": "getDatasetList", "state": state_filter})
|
|
valid_ids = {s["session_id"] for s in state_data["datasetlist"]}
|
|
sessions = [s for s in sessions if s["session_id"] in valid_ids]
|
|
log.info(f" Sessions matching filters: {len(sessions)}")
|
|
return sessions
|
|
|
|
|
|
def download_dataset_zip(session: dict, dry_run: bool = False) -> tuple[Optional[bytes], bool]:
|
|
"""Download a dataset ZIP via the API; cache to disk.
|
|
Returns (zip_bytes, api_call_made) — api_call_made is True only when the
|
|
network was actually hit so the caller can rate-limit appropriately."""
|
|
session_id = session["session_id"]
|
|
dataset_hash = session["dataset_hash"]
|
|
access_key = session["access_key"]
|
|
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
cache_path = CACHE_DIR / f"{session_id}_{dataset_hash}.zip"
|
|
|
|
if cache_path.exists():
|
|
log.debug(f" Cache hit: {cache_path.name}")
|
|
return cache_path.read_bytes(), False
|
|
|
|
if dry_run:
|
|
log.info(f" [dry-run] Would download session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)")
|
|
return None, False
|
|
|
|
log.info(f" Downloading session {session_id} ({session['dataset_size'] / 1e6:.1f} MB)…")
|
|
data = _api_get({"op": "getDataset", "access_key": access_key, "id": session_id})
|
|
zip_bytes = base64.b64decode(data["dataset"]["zip"])
|
|
cache_path.write_bytes(zip_bytes)
|
|
log.info(f" Cached → {cache_path.name}")
|
|
return zip_bytes, True
|
|
return zip_bytes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Relevance tagging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def score_relevance(title: str, description: str, subjects: list[str]) -> tuple[bool, list[str]]:
|
|
"""Return (is_relevant, list_of_matched_tags)."""
|
|
haystack = " ".join([
|
|
(title or "").lower(),
|
|
(description or "").lower(),
|
|
" ".join(s.lower() for s in subjects),
|
|
])
|
|
tags = []
|
|
for tag, keywords in RELEVANCE_KEYWORDS.items():
|
|
if any(kw in haystack for kw in keywords):
|
|
tags.append(tag)
|
|
return bool(tags), tags
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ZIP processing and DB loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _state_abbr_from_zip(zf: zipfile.ZipFile) -> str:
|
|
"""Extract the state abbreviation from the ZIP's path structure."""
|
|
for name in zf.namelist():
|
|
parts = name.split("/")
|
|
if len(parts) >= 1 and len(parts[0]) == 2:
|
|
return parts[0]
|
|
return "??"
|
|
|
|
|
|
def process_dataset(
|
|
session: dict,
|
|
zip_bytes: bytes,
|
|
conn,
|
|
state_abbr: Optional[str] = None,
|
|
dry_run: bool = False,
|
|
verbose: bool = False,
|
|
) -> int:
|
|
"""Parse all bill JSONs from a ZIP and upsert into legiscan_bills. Returns count."""
|
|
session_id = session["session_id"]
|
|
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
if not state_abbr:
|
|
state_abbr = _state_abbr_from_zip(zf)
|
|
bill_files = [n for n in zf.namelist() if "/bill/" in n and n.endswith(".json")]
|
|
|
|
if not bill_files:
|
|
log.warning(f" Session {session_id}: no bill files found in ZIP.")
|
|
return 0
|
|
|
|
rows = []
|
|
for fname in bill_files:
|
|
try:
|
|
raw = json.loads(zf.read(fname))
|
|
b = raw.get("bill", raw)
|
|
except Exception as e:
|
|
log.warning(f" Could not parse {fname}: {e}")
|
|
continue
|
|
|
|
subjects = [s["subject_name"] for s in (b.get("subjects") or []) if s.get("subject_name")]
|
|
is_rel, tags = score_relevance(
|
|
b.get("title", ""),
|
|
b.get("description", ""),
|
|
subjects,
|
|
)
|
|
|
|
status_date = b.get("status_date") or None
|
|
rows.append((
|
|
b["bill_id"],
|
|
session_id,
|
|
b.get("state", state_abbr),
|
|
b.get("bill_number"),
|
|
b.get("bill_type"),
|
|
b.get("title"),
|
|
b.get("description"),
|
|
b.get("status"),
|
|
status_date,
|
|
b.get("completed", 0),
|
|
b.get("body"),
|
|
b.get("url"),
|
|
b.get("state_link"),
|
|
b.get("change_hash"),
|
|
subjects or None,
|
|
len(b.get("sponsors") or []),
|
|
len(b.get("votes") or []),
|
|
len(b.get("texts") or []),
|
|
is_rel,
|
|
tags or None,
|
|
))
|
|
|
|
if dry_run:
|
|
log.info(f" [dry-run] Session {session_id} ({state_abbr}): would insert/update {len(rows)} bills")
|
|
return len(rows)
|
|
|
|
UPSERT = """
|
|
INSERT INTO legiscan_bills (
|
|
bill_id, session_id, state, bill_number, bill_type,
|
|
title, description, status, status_date, completed,
|
|
body, url, state_link, change_hash, subjects,
|
|
sponsor_count, vote_count, text_count,
|
|
is_relevant, relevance_tags, imported_at
|
|
) VALUES %s
|
|
ON CONFLICT (bill_id) DO UPDATE SET
|
|
change_hash = EXCLUDED.change_hash,
|
|
status = EXCLUDED.status,
|
|
status_date = EXCLUDED.status_date,
|
|
completed = EXCLUDED.completed,
|
|
subjects = EXCLUDED.subjects,
|
|
sponsor_count = EXCLUDED.sponsor_count,
|
|
vote_count = EXCLUDED.vote_count,
|
|
text_count = EXCLUDED.text_count,
|
|
is_relevant = EXCLUDED.is_relevant,
|
|
relevance_tags = EXCLUDED.relevance_tags,
|
|
imported_at = NOW()
|
|
WHERE legiscan_bills.change_hash IS DISTINCT FROM EXCLUDED.change_hash
|
|
"""
|
|
|
|
template = "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())"
|
|
|
|
with conn.cursor() as cur:
|
|
psycopg2.extras.execute_values(cur, UPSERT, rows, template=template, page_size=500)
|
|
count = cur.rowcount
|
|
|
|
# Update session bill_count
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE legiscan_sessions SET bill_count = %s, imported_at = NOW() WHERE session_id = %s",
|
|
(len(rows), session_id),
|
|
)
|
|
conn.commit()
|
|
|
|
if verbose:
|
|
relevant = sum(1 for r in rows if r[18])
|
|
log.info(f" Session {session_id} ({state_abbr}): {len(rows)} bills, {relevant} relevant, {count} upserted")
|
|
return len(rows)
|
|
|
|
|
|
def upsert_session(session: dict, state_abbr: str, conn, dry_run: bool = False):
|
|
"""Insert or update a session record."""
|
|
if dry_run:
|
|
return
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO legiscan_sessions
|
|
(session_id, state_id, state_abbr, year_start, year_end,
|
|
session_title, session_tag, is_special, is_prior,
|
|
dataset_hash, dataset_date, dataset_size_mb)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
ON CONFLICT (session_id) DO UPDATE SET
|
|
dataset_hash = EXCLUDED.dataset_hash,
|
|
dataset_date = EXCLUDED.dataset_date,
|
|
dataset_size_mb = EXCLUDED.dataset_size_mb,
|
|
session_title = EXCLUDED.session_title
|
|
""", (
|
|
session["session_id"],
|
|
session["state_id"],
|
|
state_abbr,
|
|
session["year_start"],
|
|
session["year_end"],
|
|
session.get("session_title"),
|
|
session.get("session_tag"),
|
|
bool(session.get("special")),
|
|
bool(session.get("prior")),
|
|
session.get("dataset_hash"),
|
|
session.get("dataset_date"),
|
|
session.get("dataset_size", 0) / 1e6,
|
|
))
|
|
conn.commit()
|
|
|
|
|
|
def needs_import(session: dict, conn) -> bool:
|
|
"""Return True if this session's dataset_hash differs from what's in the DB."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT dataset_hash FROM legiscan_sessions WHERE session_id = %s",
|
|
(session["session_id"],),
|
|
)
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return True
|
|
return row[0] != session["dataset_hash"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Retag phase
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def retag_all_bills(conn, dry_run: bool = False, verbose: bool = False):
|
|
"""Re-score relevance for every bill already in the DB."""
|
|
log.info("Re-tagging all bills…")
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
|
cur.execute("SELECT bill_id, title, description, subjects FROM legiscan_bills")
|
|
rows = cur.fetchall()
|
|
|
|
log.info(f" Scoring {len(rows)} bills…")
|
|
updates = []
|
|
for row in rows:
|
|
is_rel, tags = score_relevance(
|
|
row["title"] or "",
|
|
row["description"] or "",
|
|
row["subjects"] or [],
|
|
)
|
|
updates.append((is_rel, tags or None, row["bill_id"]))
|
|
|
|
if dry_run:
|
|
relevant = sum(1 for u in updates if u[0])
|
|
log.info(f" [dry-run] Would tag {relevant}/{len(updates)} bills as relevant")
|
|
return
|
|
|
|
with conn.cursor() as cur:
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"UPDATE legiscan_bills SET is_relevant = data.is_rel, relevance_tags = data.tags "
|
|
"FROM (VALUES %s) AS data(is_rel, tags, bill_id) "
|
|
"WHERE legiscan_bills.bill_id = data.bill_id::integer",
|
|
updates,
|
|
template="(%s, %s::text[], %s)",
|
|
)
|
|
conn.commit()
|
|
|
|
relevant = sum(1 for u in updates if u[0])
|
|
log.info(f" Tagged {relevant}/{len(updates)} bills as relevant.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_summary(conn):
|
|
queries = {
|
|
"Total sessions": "SELECT COUNT(*) FROM legiscan_sessions",
|
|
"Total bills": "SELECT COUNT(*) FROM legiscan_bills",
|
|
"Relevant bills": "SELECT COUNT(*) FROM legiscan_bills WHERE is_relevant",
|
|
"States covered": "SELECT COUNT(DISTINCT state) FROM legiscan_bills",
|
|
}
|
|
print("\n--- LegiScan ingestion summary ---")
|
|
with conn.cursor() as cur:
|
|
for label, sql in queries.items():
|
|
cur.execute(sql)
|
|
print(f" {label}: {cur.fetchone()[0]:,}")
|
|
|
|
# Top relevance tags
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT tag, COUNT(*) AS n
|
|
FROM legiscan_bills, unnest(relevance_tags) AS tag
|
|
GROUP BY tag ORDER BY n DESC
|
|
""")
|
|
rows = cur.fetchall()
|
|
if rows:
|
|
print("\n Relevant bills by tag:")
|
|
for tag, n in rows:
|
|
print(f" {tag:<30} {n:>6,}")
|
|
|
|
# Top states for relevant bills
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT state, COUNT(*) AS n
|
|
FROM legiscan_bills WHERE is_relevant
|
|
GROUP BY state ORDER BY n DESC LIMIT 15
|
|
""")
|
|
rows = cur.fetchall()
|
|
if rows:
|
|
print("\n Top states by relevant bill count:")
|
|
for state, n in rows:
|
|
print(f" {state} {n:>5,}")
|
|
print()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p.add_argument("--all", action="store_true", help="Run setup-db + fetch + load + tag")
|
|
p.add_argument("--setup-db", action="store_true", help="Create/update DB tables")
|
|
p.add_argument("--fetch", action="store_true", help="Download dataset ZIPs")
|
|
p.add_argument("--load", action="store_true", help="Load cached ZIPs into DB")
|
|
p.add_argument("--tag", action="store_true", help="Retag all bills for relevance")
|
|
p.add_argument("--state", default=None, metavar="XX", help="Limit to one state")
|
|
p.add_argument("--year-start", type=int, default=MIN_YEAR_DEFAULT, dest="year_start")
|
|
p.add_argument("--dry-run", action="store_true")
|
|
p.add_argument("--verbose", action="store_true")
|
|
return p.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
if args.verbose:
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
if not API_KEY:
|
|
log.error("LEGISCAN_API_KEY is not set.")
|
|
sys.exit(1)
|
|
|
|
do_setup = args.all or args.setup_db
|
|
do_fetch = args.all or args.fetch
|
|
do_load = args.all or args.load
|
|
do_tag = args.all or args.tag
|
|
|
|
if not any([do_setup, do_fetch, do_load, do_tag]):
|
|
log.error("Specify at least one phase: --all, --setup-db, --fetch, --load, --tag")
|
|
sys.exit(1)
|
|
|
|
conn = None if args.dry_run else get_db_connection()
|
|
|
|
# ── Setup ──────────────────────────────────────────────────────────────
|
|
if do_setup:
|
|
if args.dry_run:
|
|
log.info("[dry-run] Would create legiscan_sessions and legiscan_bills tables.")
|
|
else:
|
|
setup_db(conn)
|
|
|
|
# ── Fetch + Load (interleaved per session for memory efficiency) ────────
|
|
if do_fetch or do_load:
|
|
sessions = get_all_dataset_metadata(args.year_start, state_filter=args.state)
|
|
total = len(sessions)
|
|
log.info(f"Processing {total} sessions (year_start ≥ {args.year_start})…")
|
|
|
|
total_bills = 0
|
|
skipped = 0
|
|
|
|
for i, session in enumerate(sessions, 1):
|
|
session_id = session["session_id"]
|
|
state_id = session["state_id"]
|
|
year_start = session["year_start"]
|
|
title = session.get("session_title", "")
|
|
|
|
# Check if import needed
|
|
if do_load and not args.dry_run and conn and not needs_import(session, conn):
|
|
log.debug(f" [{i}/{total}] Session {session_id} ({title}) — hash unchanged, skipping.")
|
|
skipped += 1
|
|
continue
|
|
|
|
log.info(f"[{i}/{total}] Session {session_id}: {title}")
|
|
|
|
# Download
|
|
zip_bytes = None
|
|
if do_fetch:
|
|
try:
|
|
zip_bytes, api_called = download_dataset_zip(session, dry_run=args.dry_run)
|
|
if api_called:
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
except Exception as e:
|
|
log.error(f" Download failed for session {session_id}: {e}")
|
|
continue
|
|
elif do_load:
|
|
# Load from cache only
|
|
cache_path = CACHE_DIR / f"{session_id}_{session['dataset_hash']}.zip"
|
|
if not cache_path.exists():
|
|
log.warning(f" Cache miss for session {session_id} — run --fetch first.")
|
|
continue
|
|
zip_bytes = cache_path.read_bytes()
|
|
|
|
# Derive state abbreviation from ZIP structure
|
|
state_abbr = args.state
|
|
if zip_bytes and not state_abbr:
|
|
try:
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
state_abbr = _state_abbr_from_zip(zf)
|
|
except Exception:
|
|
state_abbr = "??"
|
|
|
|
# Upsert session record
|
|
if do_load and not args.dry_run and conn and state_abbr:
|
|
upsert_session(session, state_abbr, conn, dry_run=args.dry_run)
|
|
|
|
# Load bills
|
|
if do_load and zip_bytes:
|
|
try:
|
|
n = process_dataset(
|
|
session, zip_bytes, conn,
|
|
state_abbr=state_abbr,
|
|
dry_run=args.dry_run,
|
|
verbose=args.verbose,
|
|
)
|
|
total_bills += n
|
|
except Exception as e:
|
|
log.error(f" Load failed for session {session_id}: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
|
|
log.info(f"Fetch/load complete. Bills processed: {total_bills:,}. Skipped (up-to-date): {skipped}.")
|
|
|
|
# ── Tag ────────────────────────────────────────────────────────────────
|
|
if do_tag and not (do_fetch or do_load):
|
|
if args.dry_run or conn:
|
|
retag_all_bills(conn, dry_run=args.dry_run, verbose=args.verbose)
|
|
|
|
# ── Summary ────────────────────────────────────────────────────────────
|
|
if conn and not args.dry_run:
|
|
print_summary(conn)
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|