{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# RDH Precinct Vote Data for Master Data Centers\n", "\n", "This notebook discovers, downloads, stages, and spatially joins Redistricting Data Hub precinct-level election data to `public.master_data_centers`.\n", "\n", "It is designed to be rerunnable as new data centers are added:\n", "- Data-center locations come from `public.master_data_centers`.\n", "- RDH credentials are read from `RDH_USERNAME` / `RDH_PASSWORD` or prompted securely with `getpass`.\n", "- RDH files are downloaded into `data/rdh_precinct_vote/`.\n", "- Original precinct attributes are preserved in `properties jsonb` because RDH vote-column names vary by state/year/election.\n", "- Matches are written to `public.data_center_rdh_precinct_vote_matches`, joinable by `master_id`.\n", "\n", "Primary join method is point-in-precinct using longitude/latitude. Census tract context is included as a fallback/diagnostic path when the existing census tract table is available.\n", "\n", "Local RDH API reference used for this notebook:\n", "- `/home/dadams/Repos/api-redistricting_datahub/RDH_API.ipynb`\n", "- `/home/dadams/Repos/api-redistricting_datahub/RDH_API_SET_PARAMS.ipynb`\n", "- `/home/dadams/Repos/api-redistricting_datahub/Download_API.txt`\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "import hashlib\n", "import io\n", "import json\n", "import os\n", "import re\n", "import shutil\n", "import subprocess\n", "import time\n", "import zipfile\n", "from getpass import getpass\n", "from pathlib import Path\n", "from urllib.parse import parse_qs, urlparse, unquote\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import psycopg2\n", "import requests\n", "from psycopg2 import sql\n", "from psycopg2.extras import Json, execute_values\n", "\n", "try:\n", " import geopandas as gpd\n", " HAS_GEOPANDAS = True\n", "except ImportError:\n", " gpd = None\n", " HAS_GEOPANDAS = False\n", "\n", "pd.set_option('display.max_columns', 120)\n", "pd.set_option('display.max_rows', 120)\n", "\n", "print('pandas:', pd.__version__)\n", "print('requests:', requests.__version__)\n", "print('geopandas:', 'ok' if HAS_GEOPANDAS else 'not installed; spatial file loading cells will be skipped')\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2", "metadata": {}, "outputs": [], "source": [ "def load_env_file(env_path: str = '.env') -> None:\n", " p = Path(env_path)\n", " if not p.exists():\n", " print(f'No {env_path} file found in {Path.cwd()}')\n", " return\n", "\n", " loaded = 0\n", " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith('#') or '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " key = key.strip()\n", " value = value.strip().strip('\"').strip(\"'\")\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", " loaded += 1\n", " print(f'Loaded {loaded} env var(s) from {env_path}')\n", "\n", "\n", "def require_env(keys):\n", " missing = [k for k in keys if not os.getenv(k)]\n", " if missing:\n", " raise EnvironmentError(\n", " 'Missing required env vars in notebook kernel: ' + ', '.join(missing) +\n", " '.\\nSet them in this notebook, or add them to a .env file in this folder.'\n", " )\n", "\n", "\n", "load_env_file('.env')\n", "require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])\n", "\n", "DB_NAME = 'data_centers'\n", "MASTER_TABLE = 'public.master_data_centers'\n", "CENSUS_TRACT_TABLE = 'public.data_center_census_tracts_2024'\n", "\n", "LAYER_TABLE = 'public.rdh_precinct_vote_layers'\n", "FEATURE_TABLE = 'public.rdh_precinct_vote_features'\n", "MATCH_TABLE = 'public.data_center_rdh_precinct_vote_matches'\n", "\n", "\n", "def get_conn():\n", " return psycopg2.connect(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " password=os.environ['PGWEB_PASSWORD'],\n", " dbname=DB_NAME,\n", " )\n", "\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select current_database(), current_user')\n", " db, usr = cur.fetchone()\n", " print('Connected to DB:', db)\n", " print('As user:', usr)\n", " cur.execute('create extension if not exists postgis')\n", " cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n", " if cur.fetchone()[0] is None:\n", " raise RuntimeError(f'{MASTER_TABLE} does not exist. Run build_master_data_centers.py first.')\n", " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n", " print(f'{MASTER_TABLE} rows:', f'{cur.fetchone()[0]:,}')" ] }, { "cell_type": "markdown", "id": "3", "metadata": {}, "source": [ "## Parameters\n", "\n", "The defaults now target both 2020 and 2024 precinct election layers across all inferred data-center states. Set `TARGET_STATES` to a small list like `['VA']` for a quick pilot run, or keep `TARGET_STATES = None` to infer all states from `public.master_data_centers`. Use `FILTER_YEARS_ANY = []` to keep all years returned by RDH." ] }, { "cell_type": "code", "execution_count": null, "id": "4", "metadata": {}, "outputs": [], "source": [ "RDH_LIST_URL = 'https://redistrictingdatahub.org/wp-json/download/list'\n", "\n", "DATA_DIR = Path('data/rdh_precinct_vote')\n", "RAW_DIR = DATA_DIR / 'raw'\n", "EXTRACT_DIR = DATA_DIR / 'extracted'\n", "MANIFEST_PATH = DATA_DIR / 'rdh_precinct_vote_download_manifest.csv'\n", "LISTING_CACHE_PATH = DATA_DIR / 'rdh_precinct_vote_listing_cache.csv'\n", "\n", "TARGET_STATES = None # None = infer all states from master_data_centers; or list e.g. ['VA','TX']\n", "FILTER_TERMS_ALL = ['election results', 'precinct']\n", "FILTER_TERMS_ANY = [] # e.g. ['general', 'president']\n", "FILTER_YEARS_ANY = ['2020', '2024'] # set [] to keep all years returned by RDH\n", "PREFERRED_FORMATS = ['SHP'] # point-in-precinct joins need spatial files\n", "\n", "DOWNLOAD_FILES = True\n", "OVERWRITE_DOWNLOADS = False\n", "LOAD_TO_POSTGIS = True\n", "RUN_SPATIAL_MATCH = True\n", "RUN_NEAREST_PRECINCT_FALLBACK = True\n", "NEAREST_PRECINCT_MAX_DISTANCE_M = 500\n", "\n", "REQUEST_SLEEP_SECONDS = 1.0\n", "\n", "for p in [DATA_DIR, RAW_DIR, EXTRACT_DIR]:\n", " p.mkdir(parents=True, exist_ok=True)\n", "\n", "print('Data directory:', DATA_DIR.resolve())\n", "print('Download files:', DOWNLOAD_FILES)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "STATE_NAME_TO_CODE = {\n", " 'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',\n", " 'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',\n", " 'district of columbia': 'DC', 'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI',\n", " 'idaho': 'ID', 'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA',\n", " 'kansas': 'KS', 'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME',\n", " 'maryland': 'MD', 'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN',\n", " 'mississippi': 'MS', 'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE',\n", " 'nevada': 'NV', 'new hampshire': 'NH', 'new jersey': 'NJ', 'new mexico': 'NM',\n", " 'new york': 'NY', 'north carolina': 'NC', 'north dakota': 'ND', 'ohio': 'OH',\n", " 'oklahoma': 'OK', 'oregon': 'OR', 'pennsylvania': 'PA', 'rhode island': 'RI',\n", " 'south carolina': 'SC', 'south dakota': 'SD', 'tennessee': 'TN', 'texas': 'TX',\n", " 'utah': 'UT', 'vermont': 'VT', 'virginia': 'VA', 'washington': 'WA',\n", " 'west virginia': 'WV', 'wisconsin': 'WI', 'wyoming': 'WY',\n", "}\n", "STATE_CODE_TO_NAME = {v: k for k, v in STATE_NAME_TO_CODE.items()}\n", "\n", "\n", "def normalize_state_code(value):\n", " if pd.isna(value) or str(value).strip() == '':\n", " return None\n", " raw = str(value).strip()\n", " upper = raw.upper()\n", " if upper in STATE_CODE_TO_NAME:\n", " return upper\n", " return STATE_NAME_TO_CODE.get(raw.lower())\n", "\n", "\n", "def infer_target_states():\n", " if TARGET_STATES:\n", " return sorted({normalize_state_code(s) for s in TARGET_STATES if normalize_state_code(s)})\n", " query = f'''\n", " select state, count(*) as rows\n", " from {MASTER_TABLE}\n", " where geom is not null\n", " group by state\n", " order by state\n", " '''\n", " with get_conn() as conn:\n", " states = pd.read_sql_query(query, conn)\n", " states['state_code'] = states['state'].map(normalize_state_code)\n", " display(states)\n", " missing = states.loc[states['state_code'].isna() & states['state'].notna()]\n", " if not missing.empty:\n", " print('Warning: could not normalize these state values:')\n", " display(missing)\n", " return sorted(states['state_code'].dropna().unique())\n", "\n", "\n", "target_states = infer_target_states()\n", "print('Target state codes:', ', '.join(target_states))\n" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "## RDH Credentials" ] }, { "cell_type": "code", "execution_count": null, "id": "7", "metadata": {}, "outputs": [], "source": [ "RDH_USERNAME = os.getenv('RDH_USERNAME') or os.getenv('RDH_EMAIL')\n", "RDH_PASSWORD = os.getenv('RDH_PASSWORD')\n", "\n", "if not RDH_USERNAME:\n", " RDH_USERNAME = input('RDH username or email: ').strip()\n", "if not RDH_PASSWORD:\n", " RDH_PASSWORD = getpass('RDH password: ')\n", "\n", "if not RDH_USERNAME or not RDH_PASSWORD:\n", " raise RuntimeError('RDH credentials are required. Use prompts or set RDH_USERNAME/RDH_PASSWORD.')\n", "\n", "print('RDH username loaded:', RDH_USERNAME)\n", "print('RDH password loaded:', bool(RDH_PASSWORD))\n" ] }, { "cell_type": "markdown", "id": "8", "metadata": {}, "source": [ "## Discover RDH Datasets" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "def get_rdh_list_for_state(state_code):\n", " params = {\n", " 'username': RDH_USERNAME,\n", " 'password': RDH_PASSWORD,\n", " 'format': 'csv',\n", " 'states': STATE_CODE_TO_NAME.get(state_code, state_code).lower(),\n", " }\n", " response = requests.get(RDH_LIST_URL, params=params, timeout=120)\n", " response.raise_for_status()\n", " text = response.content.decode('utf-8', errors='replace')\n", " df = pd.read_csv(io.StringIO(text))\n", " df['query_state_code'] = state_code\n", " df['query_state_name'] = STATE_CODE_TO_NAME.get(state_code, state_code)\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", " return df\n", "\n", "\n", "def load_or_fetch_rdh_listing(refresh=False):\n", " cached = None\n", " if LISTING_CACHE_PATH.exists() and not refresh:\n", " cached = pd.read_csv(LISTING_CACHE_PATH)\n", " cached_states = set(cached.get('query_state_code', pd.Series(dtype=str)).dropna().unique())\n", " missing = [s for s in target_states if s not in cached_states]\n", " print(f'Cache has {len(cached):,} rows across {len(cached_states)} state(s); missing: {missing or \"none\"}')\n", " else:\n", " cached_states = set()\n", " missing = list(target_states)\n", "\n", " if missing:\n", " frames = [] if cached is None else [cached]\n", " for state_code in missing:\n", " print('Retrieving RDH listing for', state_code)\n", " frames.append(get_rdh_list_for_state(state_code))\n", " cached = pd.concat(frames, ignore_index=True)\n", " LISTING_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)\n", " cached.to_csv(LISTING_CACHE_PATH, index=False)\n", " print('Updated listing cache:', LISTING_CACHE_PATH)\n", "\n", " if cached is not None and 'query_state_code' in cached.columns:\n", " cached = cached[cached['query_state_code'].isin(target_states)].copy()\n", " return cached if cached is not None else pd.DataFrame()\n", "\n", "\n", "listing = load_or_fetch_rdh_listing(refresh=False)\n", "print(f'RDH listing rows for target states: {len(listing):,}')\n", "display(listing.head())\n", "display(pd.DataFrame({'column': listing.columns}))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "10", "metadata": {}, "outputs": [], "source": [ "def text_contains_all(text, terms):\n", " text = str(text).lower()\n", " return all(term.lower() in text for term in terms if term)\n", "\n", "\n", "def text_contains_any(text, terms):\n", " terms = [term for term in terms if term]\n", " if not terms:\n", " return True\n", " text = str(text).lower()\n", " return any(term.lower() in text for term in terms)\n", "\n", "\n", "def row_year_match(text):\n", " if not FILTER_YEARS_ANY:\n", " return True\n", " text = str(text)\n", " return any(str(year) in text for year in FILTER_YEARS_ANY)\n", "\n", "\n", "def parse_datasetid(url):\n", " if pd.isna(url):\n", " return None\n", " parsed = urlparse(str(url))\n", " qs = parse_qs(parsed.query)\n", " values = qs.get('datasetid')\n", " return values[0] if values else None\n", "\n", "\n", "def clean_filename_from_url(url, fmt):\n", " base = unquote(str(url).split('?')[0]).rstrip('/')\n", " name = base.split('/')[-1]\n", " if not name:\n", " name = 'rdh_download.zip'\n", " suffix = Path(name).suffix\n", " if not suffix:\n", " suffix = '.zip' if str(fmt).upper() == 'SHP' else ''\n", " name = name + suffix\n", " return re.sub(r'[^A-Za-z0-9._-]+', '_', name)\n", "\n", "\n", "def detect_year(text):\n", " match = re.search(r'\\b(20\\d{2})\\b', str(text))\n", " return match.group(1) if match else None\n", "\n", "\n", "work = listing.copy()\n", "for required in ['Title', 'Format', 'URL']:\n", " if required not in work.columns:\n", " raise RuntimeError(f'RDH listing is missing expected column: {required}')\n", "\n", "work['title_format'] = work['Title'].fillna('').astype(str) + ' ' + work['Format'].fillna('').astype(str)\n", "work['datasetid'] = work['URL'].map(parse_datasetid)\n", "work['format_norm'] = work['Format'].fillna('').astype(str).str.upper()\n", "work['filename'] = work.apply(lambda r: clean_filename_from_url(r['URL'], r['Format']), axis=1)\n", "\n", "filtered = work[\n", " work['title_format'].map(lambda x: text_contains_all(x, FILTER_TERMS_ALL))\n", " & work['title_format'].map(lambda x: text_contains_any(x, FILTER_TERMS_ANY))\n", " & work['title_format'].map(row_year_match)\n", " & work['format_norm'].isin(PREFERRED_FORMATS)\n", "].copy()\n", "\n", "filtered = filtered.sort_values(['query_state_code', 'Title', 'Format', 'filename']).reset_index(drop=True)\n", "filtered['detected_year'] = filtered['Title'].map(detect_year)\n", "\n", "print(f'Filtered candidate files: {len(filtered):,}')\n", "year_summary = (\n", " filtered.assign(detected_year=filtered['detected_year'].fillna('unknown'))\n", " .groupby('detected_year', dropna=False)\n", " .size()\n", " .reset_index(name='rows')\n", " .sort_values('detected_year')\n", ")\n", "print('Candidate rows by detected year:')\n", "display(year_summary)\n", "\n", "display(filtered[['query_state_code', 'detected_year', 'Title', 'Format', 'datasetid', 'filename', 'URL']].head(100))" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "## Download Candidate Files" ] }, { "cell_type": "code", "execution_count": null, "id": "12", "metadata": {}, "outputs": [], "source": [ "def layer_id_for_row(row):\n", " key = '|'.join([\n", " str(row.get('query_state_code', '')),\n", " str(row.get('Title', '')),\n", " str(row.get('Format', '')),\n", " str(row.get('datasetid', '')),\n", " str(row.get('URL', '')),\n", " ])\n", " return hashlib.sha1(key.encode('utf-8')).hexdigest()[:16]\n", "\n", "\n", "def download_rdh_file(row):\n", " base_url = str(row['URL']).split('?')[0]\n", " params = {\n", " 'username': RDH_USERNAME,\n", " 'password': RDH_PASSWORD,\n", " }\n", " if row.get('datasetid'):\n", " params['datasetid'] = row['datasetid']\n", "\n", " state_dir = RAW_DIR / row['query_state_code']\n", " state_dir.mkdir(parents=True, exist_ok=True)\n", " out_path = state_dir / row['filename']\n", " if out_path.exists() and not OVERWRITE_DOWNLOADS:\n", " return out_path, 'exists'\n", "\n", " response = requests.get(base_url, params=params, timeout=300)\n", " response.raise_for_status()\n", " out_path.write_bytes(response.content)\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", " return out_path, 'downloaded'\n", "\n", "\n", "download_rows = []\n", "for row in filtered.to_dict('records'):\n", " layer_id = layer_id_for_row(row)\n", " out_path = RAW_DIR / row['query_state_code'] / row['filename']\n", " status = 'planned'\n", " if DOWNLOAD_FILES:\n", " print('Retrieving', row['query_state_code'], row['Title'], row['filename'])\n", " out_path, status = download_rdh_file(row)\n", " elif out_path.exists():\n", " status = 'exists'\n", " download_rows.append({\n", " 'layer_id': layer_id,\n", " 'state_code': row['query_state_code'],\n", " 'title': row['Title'],\n", " 'format': row['Format'],\n", " 'datasetid': row.get('datasetid'),\n", " 'source_url': row['URL'],\n", " 'filename': row['filename'],\n", " 'local_path': str(out_path),\n", " 'download_status': status,\n", " })\n", "\n", "manifest = pd.DataFrame(download_rows)\n", "manifest.to_csv(MANIFEST_PATH, index=False)\n", "print('Wrote manifest:', MANIFEST_PATH)\n", "display(manifest.head(100))\n", "display(manifest['download_status'].value_counts(dropna=False).rename_axis('status').reset_index(name='rows'))\n", "\n", "if not DOWNLOAD_FILES and not manifest['local_path'].map(lambda p: Path(str(p)).exists()).any():\n", " print('No RDH shapefiles have been downloaded yet. Review the candidates above, then set DOWNLOAD_FILES=True and rerun this cell and the cells below.')\n" ] }, { "cell_type": "markdown", "id": "13", "metadata": {}, "source": [ "## Unpack and Find Spatial Files" ] }, { "cell_type": "code", "execution_count": null, "id": "14", "metadata": {}, "outputs": [], "source": [ "def extract_archive(path, layer_id):\n", " path = Path(path)\n", " extract_to = EXTRACT_DIR / layer_id\n", " extract_to.mkdir(parents=True, exist_ok=True)\n", " if path.suffix.lower() == '.zip':\n", " with zipfile.ZipFile(path) as zf:\n", " zf.extractall(extract_to)\n", " else:\n", " shutil.copy2(path, extract_to / path.name)\n", " return extract_to\n", "\n", "\n", "def find_spatial_file(folder):\n", " folder = Path(folder)\n", " candidates = (\n", " list(folder.rglob('*.shp')) +\n", " list(folder.rglob('*.geojson')) +\n", " list(folder.rglob('*.gpkg'))\n", " )\n", " if not candidates:\n", " return None\n", " candidates = sorted(candidates, key=lambda p: (p.suffix.lower() != '.shp', len(str(p))))\n", " return candidates[0]\n", "\n", "\n", "if MANIFEST_PATH.exists():\n", " manifest = pd.read_csv(MANIFEST_PATH)\n", "\n", "spatial_rows = []\n", "for row in manifest.to_dict('records'):\n", " local_path = Path(str(row['local_path']))\n", " if not local_path.exists():\n", " spatial_rows.append({**row, 'extract_dir': None, 'spatial_path': None, 'spatial_status': 'missing_download'})\n", " continue\n", " extract_dir = extract_archive(local_path, row['layer_id'])\n", " spatial_path = find_spatial_file(extract_dir)\n", " spatial_rows.append({\n", " **row,\n", " 'extract_dir': str(extract_dir),\n", " 'spatial_path': str(spatial_path) if spatial_path else None,\n", " 'spatial_status': 'found' if spatial_path else 'no_spatial_file',\n", " })\n", "\n", "spatial_manifest = pd.DataFrame(spatial_rows)\n", "spatial_manifest.to_csv(DATA_DIR / 'rdh_precinct_vote_spatial_manifest.csv', index=False)\n", "display(spatial_manifest[['state_code', 'title', 'filename', 'spatial_status', 'spatial_path']].head(100))\n", "display(spatial_manifest['spatial_status'].value_counts(dropna=False).rename_axis('status').reset_index(name='rows'))\n", "\n", "if not spatial_manifest['spatial_status'].eq('found').any():\n", " print('No spatial files were found. If spatial_status is missing_download, set DOWNLOAD_FILES=True and rerun the download/unpack cells.')\n" ] }, { "cell_type": "markdown", "id": "15", "metadata": {}, "source": [ "## Create PostGIS Staging Tables" ] }, { "cell_type": "code", "execution_count": null, "id": "16", "metadata": {}, "outputs": [], "source": [ "CREATE_TABLES_SQL = f'''\n", "create table if not exists {LAYER_TABLE} (\n", " layer_id text primary key,\n", " state_code text,\n", " title text,\n", " format text,\n", " datasetid text,\n", " source_url text,\n", " filename text,\n", " local_path text,\n", " spatial_path text,\n", " metadata jsonb,\n", " loaded_at timestamptz not null default now()\n", ");\n", "\n", "create table if not exists {FEATURE_TABLE} (\n", " feature_id text primary key,\n", " layer_id text not null references public.rdh_precinct_vote_layers(layer_id) on delete cascade,\n", " state_code text,\n", " source_row integer,\n", " properties jsonb,\n", " geom geometry(MultiPolygon, 4326)\n", ");\n", "\n", "create index if not exists rdh_precinct_vote_features_geom_gix\n", " on {FEATURE_TABLE} using gist (geom);\n", "create index if not exists rdh_precinct_vote_features_layer_idx\n", " on {FEATURE_TABLE} (layer_id);\n", "create index if not exists rdh_precinct_vote_features_state_idx\n", " on {FEATURE_TABLE} (state_code);\n", "\n", "create table if not exists {MATCH_TABLE} (\n", " master_id text not null references public.master_data_centers(master_id) on delete cascade,\n", " feature_id text not null references public.rdh_precinct_vote_features(feature_id) on delete cascade,\n", " layer_id text not null references public.rdh_precinct_vote_layers(layer_id) on delete cascade,\n", " state_code text,\n", " join_method text not null,\n", " match_distance_m double precision,\n", " matched_at timestamptz not null default now(),\n", " primary key (master_id, feature_id)\n", ");\n", "create index if not exists data_center_rdh_precinct_vote_matches_master_idx\n", " on {MATCH_TABLE} (master_id);\n", "create index if not exists data_center_rdh_precinct_vote_matches_layer_idx\n", " on {MATCH_TABLE} (layer_id);\n", "'''\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(CREATE_TABLES_SQL)\n", " print('PostGIS staging tables are ready.')\n" ] }, { "cell_type": "markdown", "id": "17", "metadata": {}, "source": [ "## Load Precinct Geometries to PostGIS" ] }, { "cell_type": "code", "execution_count": null, "id": "18", "metadata": {}, "outputs": [], "source": [ "def json_safe(value):\n", " if isinstance(value, (pd.Timestamp, np.datetime64)):\n", " return str(value)\n", " if isinstance(value, np.generic):\n", " return value.item()\n", " if pd.isna(value):\n", " return None\n", " return value\n", "\n", "\n", "def stable_feature_id(layer_id, source_row, geom):\n", " geom_hash = hashlib.sha1(geom.wkb).hexdigest()[:16] if geom is not None else 'nogeom'\n", " return hashlib.sha1(f'{layer_id}|{source_row}|{geom_hash}'.encode('utf-8')).hexdigest()[:24]\n", "\n", "\n", "def load_layer_to_postgis_with_ogr(row):\n", " spatial_path = row.get('spatial_path')\n", " if not spatial_path or not Path(spatial_path).exists():\n", " return {'layer_id': row['layer_id'], 'status': 'missing_spatial_file', 'features': 0}\n", "\n", " ogr2ogr_path = shutil.which('ogr2ogr')\n", " if not ogr2ogr_path:\n", " return {'layer_id': row['layer_id'], 'status': 'missing_geopandas_and_ogr2ogr', 'features': 0}\n", "\n", " staging_table = '_rdh_import_' + re.sub(r'[^a-zA-Z0-9_]', '_', row['layer_id']).lower()\n", " dsn = 'PG:host={host} port={port} user={user} dbname={dbname}'.format(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " dbname=DB_NAME,\n", " )\n", " env = os.environ.copy()\n", " env['PGPASSWORD'] = os.environ['PGWEB_PASSWORD']\n", " cmd = [\n", " ogr2ogr_path,\n", " '-f', 'PostgreSQL',\n", " dsn,\n", " str(spatial_path),\n", " '-nln', f'public.{staging_table}',\n", " '-overwrite',\n", " '-t_srs', 'EPSG:4326',\n", " '-nlt', 'PROMOTE_TO_MULTI',\n", " '-dim', 'XY',\n", " '-lco', 'GEOMETRY_NAME=geom',\n", " ]\n", " result = subprocess.run(cmd, env=env, capture_output=True, text=True)\n", " if result.returncode != 0:\n", " return {\n", " 'layer_id': row['layer_id'],\n", " 'status': 'ogr2ogr_failed',\n", " 'features': 0,\n", " 'error': (result.stderr or result.stdout)[-1000:],\n", " }\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('''\n", " select column_name\n", " from information_schema.columns\n", " where table_schema = 'public' and table_name = %s\n", " order by ordinal_position\n", " ''', (staging_table,))\n", " columns = [r[0] for r in cur.fetchall()]\n", " geom_col = 'geom' if 'geom' in columns else 'wkb_geometry'\n", " order_col = 'ogc_fid' if 'ogc_fid' in columns else 'ctid'\n", "\n", " metadata = {\n", " 'columns': columns,\n", " 'source_spatial_path': str(spatial_path),\n", " 'loader': 'ogr2ogr',\n", " }\n", " cur.execute(sql.SQL('''\n", " insert into {layers} (\n", " layer_id, state_code, title, format, datasetid, source_url,\n", " filename, local_path, spatial_path, metadata, loaded_at\n", " ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())\n", " on conflict (layer_id) do update set\n", " state_code = excluded.state_code,\n", " title = excluded.title,\n", " format = excluded.format,\n", " datasetid = excluded.datasetid,\n", " source_url = excluded.source_url,\n", " filename = excluded.filename,\n", " local_path = excluded.local_path,\n", " spatial_path = excluded.spatial_path,\n", " metadata = excluded.metadata,\n", " loaded_at = now()\n", " ''').format(layers=sql.SQL(LAYER_TABLE)), (\n", " row['layer_id'], row['state_code'], row['title'], row['format'],\n", " row.get('datasetid'), row['source_url'], row['filename'],\n", " row['local_path'], spatial_path, Json(metadata)\n", " ))\n", " cur.execute(sql.SQL('delete from {} where layer_id = %s').format(sql.SQL(FEATURE_TABLE)), (row['layer_id'],))\n", " insert_sql = sql.SQL('''\n", " insert into {features} (feature_id, layer_id, state_code, source_row, properties, geom)\n", " with src as (\n", " select\n", " row_number() over (order by {order_col})::integer as source_row,\n", " to_jsonb(t) - %s - 'ogc_fid' - 'fid' as properties,\n", " ST_Force2D({geom_col}) as geom\n", " from {staging} t\n", " where {geom_col} is not null\n", " ), fixed as (\n", " select\n", " source_row,\n", " properties,\n", " (ST_Dump(ST_Multi(ST_CollectionExtract(ST_MakeValid(geom), 3)))).geom as geom\n", " from src\n", " )\n", " select\n", " %s || '/' || source_row::text,\n", " %s,\n", " %s,\n", " source_row,\n", " properties,\n", " ST_Multi(ST_Force2D(ST_SetSRID(geom, 4326)))::geometry(MultiPolygon, 4326)\n", " from fixed\n", " where geom is not null and not ST_IsEmpty(geom)\n", " ''').format(\n", " features=sql.SQL(FEATURE_TABLE),\n", " staging=sql.Identifier('public', staging_table),\n", " geom_col=sql.Identifier(geom_col),\n", " order_col=sql.SQL(order_col) if order_col == 'ctid' else sql.Identifier(order_col),\n", " )\n", " cur.execute(insert_sql, (geom_col, row['layer_id'], row['layer_id'], row['state_code']))\n", " inserted = cur.rowcount\n", " cur.execute(sql.SQL('drop table if exists {}').format(sql.Identifier('public', staging_table)))\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(FEATURE_TABLE)))\n", " return {'layer_id': row['layer_id'], 'status': 'loaded_ogr2ogr', 'features': inserted}\n", "\n", "\n", "def load_layer_to_postgis(row):\n", " if not HAS_GEOPANDAS:\n", " return load_layer_to_postgis_with_ogr(row)\n", " spatial_path = row.get('spatial_path')\n", " if not spatial_path or not Path(spatial_path).exists():\n", " return {'layer_id': row['layer_id'], 'status': 'missing_spatial_file', 'features': 0}\n", "\n", " gdf = gpd.read_file(spatial_path)\n", " if gdf.empty:\n", " return {'layer_id': row['layer_id'], 'status': 'empty_file', 'features': 0}\n", " if gdf.crs is None:\n", " print(f'Warning: {spatial_path} has no CRS; assuming EPSG:4326.')\n", " gdf = gdf.set_crs(4326)\n", " else:\n", " gdf = gdf.to_crs(4326)\n", "\n", " gdf = gdf[gdf.geometry.notna()].copy()\n", " gdf = gdf[~gdf.geometry.is_empty].copy()\n", " if gdf.empty:\n", " return {'layer_id': row['layer_id'], 'status': 'no_valid_geometry', 'features': 0}\n", "\n", " layer_metadata = {\n", " 'columns': [str(c) for c in gdf.columns],\n", " 'row_count': int(len(gdf)),\n", " 'source_spatial_path': spatial_path,\n", " }\n", "\n", " feature_rows = []\n", " prop_cols = [c for c in gdf.columns if c != gdf.geometry.name]\n", " for source_row, (_, record) in enumerate(gdf.iterrows(), start=1):\n", " geom = record.geometry\n", " if geom is None or geom.is_empty:\n", " continue\n", " properties = {str(col): json_safe(record[col]) for col in prop_cols}\n", " feature_rows.append((\n", " stable_feature_id(row['layer_id'], source_row, geom),\n", " row['layer_id'],\n", " row['state_code'],\n", " source_row,\n", " Json(properties),\n", " json.dumps(geom.__geo_interface__),\n", " ))\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(sql.SQL('''\n", " insert into {layers} (\n", " layer_id, state_code, title, format, datasetid, source_url,\n", " filename, local_path, spatial_path, metadata, loaded_at\n", " ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())\n", " on conflict (layer_id) do update set\n", " state_code = excluded.state_code,\n", " title = excluded.title,\n", " format = excluded.format,\n", " datasetid = excluded.datasetid,\n", " source_url = excluded.source_url,\n", " filename = excluded.filename,\n", " local_path = excluded.local_path,\n", " spatial_path = excluded.spatial_path,\n", " metadata = excluded.metadata,\n", " loaded_at = now()\n", " ''').format(layers=sql.SQL(LAYER_TABLE)), (\n", " row['layer_id'], row['state_code'], row['title'], row['format'],\n", " row.get('datasetid'), row['source_url'], row['filename'],\n", " row['local_path'], spatial_path, Json(layer_metadata)\n", " ))\n", " cur.execute(sql.SQL('delete from {} where layer_id = %s').format(sql.SQL(FEATURE_TABLE)), (row['layer_id'],))\n", " if feature_rows:\n", " execute_values(\n", " cur,\n", " sql.SQL('''\n", " insert into {features} (\n", " feature_id, layer_id, state_code, source_row, properties, geom\n", " ) values %s\n", " ''').format(features=sql.SQL(FEATURE_TABLE)).as_string(conn),\n", " feature_rows,\n", " template='(%s, %s, %s, %s, %s, ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Force2D(ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326))), 3)))',\n", " page_size=1000,\n", " )\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(FEATURE_TABLE)))\n", " return {'layer_id': row['layer_id'], 'status': 'loaded', 'features': len(feature_rows)}\n", "\n", "\n", "SKIP_ALREADY_LOADED_LAYERS = True\n", "\n", "load_results = []\n", "if LOAD_TO_POSTGIS:\n", " found = spatial_manifest[spatial_manifest['spatial_status'].eq('found')].copy()\n", " already_loaded = set()\n", " if SKIP_ALREADY_LOADED_LAYERS:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(f'select layer_id from {LAYER_TABLE}')\n", " already_loaded = {r[0] for r in cur.fetchall()}\n", " print(f'Skipping {len(already_loaded)} layer(s) already in PostGIS')\n", " for row in found.to_dict('records'):\n", " if row['layer_id'] in already_loaded:\n", " load_results.append({'layer_id': row['layer_id'], 'status': 'already_loaded', 'features': None})\n", " continue\n", " print('Loading layer:', row['state_code'], row['title'])\n", " load_results.append(load_layer_to_postgis(row))\n", "else:\n", " print('LOAD_TO_POSTGIS=False; skipping spatial file load.')\n", "\n", "load_results = pd.DataFrame(load_results)\n", "display(load_results)\n" ] }, { "cell_type": "markdown", "id": "19", "metadata": {}, "source": [ "## Match Data Centers to Precinct Layers" ] }, { "cell_type": "code", "execution_count": null, "id": "20", "metadata": {}, "outputs": [], "source": [ "MATCH_SQL = f'''\n", "insert into {MATCH_TABLE} (\n", " master_id, feature_id, layer_id, state_code, join_method, match_distance_m, matched_at\n", ")\n", "select\n", " m.master_id,\n", " f.feature_id,\n", " f.layer_id,\n", " f.state_code,\n", " 'point_in_precinct' as join_method,\n", " 0.0 as match_distance_m,\n", " now()\n", "from {MASTER_TABLE} m\n", "join {FEATURE_TABLE} f\n", " on m.geom && f.geom\n", " and ST_Covers(f.geom, m.geom)\n", "where m.geom is not null\n", "on conflict (master_id, feature_id) do update set\n", " join_method = excluded.join_method,\n", " match_distance_m = excluded.match_distance_m,\n", " matched_at = now()\n", "'''\n", "\n", "if RUN_SPATIAL_MATCH:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(MATCH_SQL)\n", " print('Inserted/updated matches:', cur.rowcount)\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(MATCH_TABLE)))\n", "else:\n", " print('RUN_SPATIAL_MATCH=False; skipping point-in-precinct matching.')\n" ] }, { "cell_type": "markdown", "id": "21", "metadata": {}, "source": [ "## Match Diagnostics" ] }, { "cell_type": "code", "execution_count": null, "id": "22", "metadata": {}, "outputs": [], "source": [ "diagnostics_sql = f'''\n", "with dc as (\n", " select state, count(*) as data_centers, ST_Extent(geom) as dc_extent\n", " from {MASTER_TABLE}\n", " where geom is not null\n", " group by state\n", "), feats as (\n", " select state_code as state, count(*) as precinct_features, ST_Extent(geom) as precinct_extent\n", " from {FEATURE_TABLE}\n", " where geom is not null\n", " group by state_code\n", "), matches as (\n", " select state_code as state, count(distinct master_id) as matched_data_centers\n", " from {MATCH_TABLE}\n", " group by state_code\n", ")\n", "select\n", " coalesce(dc.state, feats.state, matches.state) as state,\n", " coalesce(dc.data_centers, 0) as data_centers,\n", " coalesce(feats.precinct_features, 0) as precinct_features,\n", " coalesce(matches.matched_data_centers, 0) as matched_data_centers,\n", " dc.dc_extent::text as data_center_extent,\n", " feats.precinct_extent::text as precinct_extent\n", "from dc\n", "full join feats on feats.state = dc.state\n", "full join matches on matches.state = coalesce(dc.state, feats.state)\n", "order by state\n", "'''\n", "\n", "nearest_sql = f'''\n", "select\n", " m.master_id,\n", " m.name,\n", " m.city,\n", " m.state,\n", " nearest.layer_id,\n", " nearest.feature_id,\n", " nearest.distance_m\n", "from {MASTER_TABLE} m\n", "left join {MATCH_TABLE} matched on matched.master_id = m.master_id\n", "left join lateral (\n", " select\n", " f.layer_id,\n", " f.feature_id,\n", " ST_Distance(m.geom::geography, f.geom::geography) as distance_m\n", " from {FEATURE_TABLE} f\n", " where f.geom is not null\n", " order by m.geom <-> f.geom\n", " limit 1\n", ") nearest on true\n", "where m.geom is not null\n", " and matched.master_id is null\n", "order by nearest.distance_m nulls last\n", "limit 50\n", "'''\n", "\n", "with get_conn() as conn:\n", " diagnostics = pd.read_sql_query(diagnostics_sql, conn)\n", " nearest_unmatched = pd.read_sql_query(nearest_sql, conn)\n", "\n", "display(diagnostics)\n", "display(nearest_unmatched)\n", "\n", "if diagnostics['precinct_features'].sum() == 0:\n", " print('PostGIS has zero RDH precinct features loaded. Run the download, unpack, and load cells before matching.')\n", "elif diagnostics['matched_data_centers'].sum() == 0:\n", " print('Precinct features exist, but no point-in-polygon matches were found. Check the extents and nearest unmatched distances above.')\n" ] }, { "cell_type": "markdown", "id": "23", "metadata": {}, "source": [ "## Optional Nearest-Precinct Fallback" ] }, { "cell_type": "code", "execution_count": null, "id": "24", "metadata": {}, "outputs": [], "source": [ "NEAREST_FALLBACK_SQL = f'''\n", "insert into {MATCH_TABLE} (\n", " master_id, feature_id, layer_id, state_code, join_method, match_distance_m, matched_at\n", ")\n", "with candidates as (\n", " select m.master_id, m.geom, upper(m.state) as state_code\n", " from {MASTER_TABLE} m\n", " where m.geom is not null\n", " and m.state is not null\n", " and upper(m.state) in (select distinct state_code from {FEATURE_TABLE} where geom is not null)\n", " and not exists (\n", " select 1 from {MATCH_TABLE} existing where existing.master_id = m.master_id\n", " )\n", ")\n", "select\n", " c.master_id,\n", " nearest.feature_id,\n", " nearest.layer_id,\n", " nearest.state_code,\n", " 'nearest_precinct_within_threshold' as join_method,\n", " nearest.distance_m,\n", " now()\n", "from candidates c\n", "join lateral (\n", " select\n", " f.feature_id,\n", " f.layer_id,\n", " f.state_code,\n", " ST_Distance(c.geom::geography, f.geom::geography) as distance_m\n", " from {FEATURE_TABLE} f\n", " where f.geom is not null\n", " and f.state_code = c.state_code\n", " order by c.geom <-> f.geom\n", " limit 1\n", ") nearest on true\n", "where nearest.distance_m <= %s\n", "on conflict (master_id, feature_id) do update set\n", " join_method = excluded.join_method,\n", " match_distance_m = excluded.match_distance_m,\n", " matched_at = now()\n", "'''\n", "\n", "if RUN_SPATIAL_MATCH and RUN_NEAREST_PRECINCT_FALLBACK:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(NEAREST_FALLBACK_SQL, (NEAREST_PRECINCT_MAX_DISTANCE_M,))\n", " print('Inserted/updated nearest fallback matches:', cur.rowcount)\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(MATCH_TABLE)))\n", "else:\n", " print('Nearest fallback disabled.')\n" ] }, { "cell_type": "markdown", "id": "25", "metadata": {}, "source": [ "## Inspect Matches and Census-Tract Context" ] }, { "cell_type": "code", "execution_count": null, "id": "26", "metadata": {}, "outputs": [], "source": [ "match_summary_sql = f'''\n", "select\n", " l.state_code,\n", " l.title,\n", " count(distinct f.feature_id) as precinct_features,\n", " count(distinct m.master_id) as matched_data_centers\n", "from {LAYER_TABLE} l\n", "left join {FEATURE_TABLE} f on f.layer_id = l.layer_id\n", "left join {MATCH_TABLE} m on m.feature_id = f.feature_id\n", "group by l.state_code, l.title\n", "order by l.state_code, matched_data_centers desc, l.title\n", "'''\n", "\n", "sample_sql = f'''\n", "select\n", " m.master_id,\n", " dc.name,\n", " dc.city,\n", " dc.state,\n", " l.title as rdh_layer_title,\n", " f.properties as precinct_properties\n", "from {MATCH_TABLE} m\n", "join {MASTER_TABLE} dc on dc.master_id = m.master_id\n", "join {FEATURE_TABLE} f on f.feature_id = m.feature_id\n", "join {LAYER_TABLE} l on l.layer_id = m.layer_id\n", "order by dc.state, dc.city, dc.name\n", "limit 25\n", "'''\n", "\n", "with get_conn() as conn:\n", " match_summary = pd.read_sql_query(match_summary_sql, conn)\n", " sample_matches = pd.read_sql_query(sample_sql, conn)\n", "\n", "display(match_summary)\n", "display(sample_matches)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "27", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select to_regclass(%s)', (CENSUS_TRACT_TABLE,))\n", " has_census_tract_table = cur.fetchone()[0] is not None\n", "\n", "if has_census_tract_table:\n", " tract_context_sql = f'''\n", " select\n", " dc.master_id,\n", " dc.name,\n", " dc.city,\n", " dc.state,\n", " dc.geoid as census_tract_geoid,\n", " count(pm.feature_id) as precinct_layer_matches\n", " from {MASTER_TABLE} dc\n", " left join {CENSUS_TRACT_TABLE} ct on ct.geoid = dc.geoid\n", " left join {MATCH_TABLE} pm on pm.master_id = dc.master_id\n", " group by dc.master_id, dc.name, dc.city, dc.state, ct.geoid\n", " order by precinct_layer_matches asc, dc.state, dc.city\n", " limit 50\n", " '''\n", " with get_conn() as conn:\n", " tract_context = pd.read_sql_query(tract_context_sql, conn)\n", " display(tract_context)\n", "else:\n", " print(f'{CENSUS_TRACT_TABLE} not found; census tract fallback/context skipped.')\n" ] }, { "cell_type": "markdown", "id": "28", "metadata": {}, "source": [ "## Standardized Vote Fields\n", "\n", "The cell below extracts a standardized set of election attributes from `precinct_properties` using heuristic key matching across RDH file families.\n", "\n", "Extracted fields:\n", "- precinct identifier/name\n", "- election year\n", "- office\n", "- Democratic votes\n", "- Republican votes\n", "- total votes\n", "- turnout or vote share\n", "\n", "Because RDH schemas vary by state and source, this step is intentionally tolerant and computes fallback vote-share values when direct turnout/share fields are not present." ] }, { "cell_type": "code", "execution_count": null, "id": "29", "metadata": {}, "outputs": [], "source": [ "STANDARDIZED_LIMIT = None # set an int (e.g., 2000) for faster sampling\n", "\n", "limit_clause = '' if STANDARDIZED_LIMIT is None else 'limit %s'\n", "standardized_sql = f'''\n", "select\n", " m.master_id,\n", " dc.name,\n", " dc.city,\n", " dc.state,\n", " l.title as rdh_layer_title,\n", " f.properties as precinct_properties\n", "from {MATCH_TABLE} m\n", "join {MASTER_TABLE} dc on dc.master_id = m.master_id\n", "join {FEATURE_TABLE} f on f.feature_id = m.feature_id\n", "join {LAYER_TABLE} l on l.layer_id = m.layer_id\n", "order by dc.state, dc.city, dc.name\n", "{limit_clause}\n", "'''\n", "\n", "with get_conn() as conn:\n", " if STANDARDIZED_LIMIT is None:\n", " raw_standardized = pd.read_sql_query(standardized_sql, conn)\n", " else:\n", " raw_standardized = pd.read_sql_query(standardized_sql, conn, params=[STANDARDIZED_LIMIT])\n", "\n", "\n", "def parse_props(value):\n", " if isinstance(value, dict):\n", " return value\n", " if pd.isna(value):\n", " return {}\n", " text = str(value).strip()\n", " if not text:\n", " return {}\n", " try:\n", " obj = json.loads(text)\n", " return obj if isinstance(obj, dict) else {}\n", " except Exception:\n", " return {}\n", "\n", "\n", "def norm_key(k):\n", " return re.sub(r'[^a-z0-9]+', '_', str(k).strip().lower()).strip('_')\n", "\n", "\n", "def as_number(v):\n", " if v is None:\n", " return None\n", " if isinstance(v, (int, float, np.integer, np.floating)):\n", " if pd.isna(v):\n", " return None\n", " return float(v)\n", " text = str(v).strip().replace(',', '')\n", " if text == '':\n", " return None\n", " if re.fullmatch(r'-?\\d+(\\.\\d+)?', text):\n", " return float(text)\n", " return None\n", "\n", "\n", "def parse_year_from_title(title):\n", " m = re.search(r'\\b((?:19|20)\\d{2})\\b', str(title))\n", " return int(m.group(1)) if m else None\n", "\n", "\n", "def infer_year_from_keys(props_norm):\n", " key_patterns = [\n", " re.compile(r'^[pg](\\d{2})(pre|uss|con|gov|ag|sos|ltg|tre|aud).*'),\n", " re.compile(r'^[pg](\\d{2}).*'),\n", " ]\n", " for key in props_norm.keys():\n", " nk = norm_key(key)\n", " for pat in key_patterns:\n", " m = pat.match(nk)\n", " if m:\n", " yy = int(m.group(1))\n", " return 2000 + yy if yy < 60 else 1900 + yy\n", " return None\n", "\n", "\n", "def decode_rdh_vote_key(key):\n", " k = norm_key(key)\n", "\n", " m = re.match(r'^[pg](\\d{2})pre([a-z]).*', k)\n", " if m:\n", " party_code = m.group(2)\n", " return ('President', party_code)\n", "\n", " m = re.match(r'^[pg](\\d{2})uss([a-z]).*', k)\n", " if m:\n", " party_code = m.group(2)\n", " return ('U.S. Senate', party_code)\n", "\n", " m = re.match(r'^[pg](\\d{2})con(\\d{2})([a-z]).*', k)\n", " if m:\n", " district = m.group(2)\n", " party_code = m.group(3)\n", " return (f'U.S. House District {district}', party_code)\n", "\n", " return (None, None)\n", "\n", "\n", "def party_from_key(key):\n", " k = norm_key(key)\n", " office, party_code = decode_rdh_vote_key(k)\n", " if party_code == 'd':\n", " return office, 'D'\n", " if party_code == 'r':\n", " return office, 'R'\n", "\n", " if any(t in k for t in ['biden', 'dem', 'democrat']):\n", " return office, 'D'\n", " if any(t in k for t in ['trump', 'gop', 'rep', 'republican']):\n", " return office, 'R'\n", "\n", " return office, None\n", "\n", "\n", "def detect_office(title, props_norm, vote_office_totals):\n", " title_lower = str(title).lower()\n", " if 'president' in title_lower or 'presidential' in title_lower:\n", " return 'President'\n", " if 'senate' in title_lower:\n", " return 'U.S. Senate'\n", " if 'house' in title_lower or 'congress' in title_lower:\n", " return 'U.S. House'\n", " if 'governor' in title_lower:\n", " return 'Governor'\n", "\n", " if vote_office_totals:\n", " return max(vote_office_totals.items(), key=lambda x: x[1])[0]\n", "\n", " office_key_hits = [k for k in props_norm if any(x in k for x in ['office', 'contest', 'race'])]\n", " if office_key_hits:\n", " best = office_key_hits[0]\n", " val = props_norm.get(best)\n", " if isinstance(val, str) and val.strip():\n", " return val.strip()\n", " return None\n", "\n", "\n", "def best_precinct_identifier(props_norm):\n", " preferred_keys = [\n", " 'precinct', 'precinct_name', 'precinctid', 'precinct_id', 'precinct20',\n", " 'pctname', 'pct', 'vtd', 'vtdst', 'vtdst20', 'name20',\n", " 'district', 'district_name', 'ward', 'geoid', 'geoid20', 'unique_id',\n", " ]\n", " for key in preferred_keys:\n", " if key in props_norm and str(props_norm[key]).strip():\n", " return str(props_norm[key]).strip()\n", "\n", " fallback_candidates = [\n", " (k, v) for k, v in props_norm.items()\n", " if any(t in k for t in ['precinct', 'vtd', 'ward', 'district', 'geo', 'name']) and str(v).strip()\n", " ]\n", " if fallback_candidates:\n", " return str(fallback_candidates[0][1]).strip()\n", " return None\n", "\n", "\n", "def extract_vote_fields(row):\n", " props = parse_props(row['precinct_properties'])\n", " props_norm = {norm_key(k): v for k, v in props.items()}\n", "\n", " precinct_id_or_name = best_precinct_identifier(props_norm)\n", " election_year = parse_year_from_title(row['rdh_layer_title'])\n", " if election_year is None:\n", " election_year = infer_year_from_keys(props_norm)\n", "\n", " year_keys = [k for k in props_norm if 'year' in k]\n", " if election_year is None and year_keys:\n", " for k in year_keys:\n", " y = as_number(props_norm[k])\n", " if y and 1900 <= y <= 2100:\n", " election_year = int(y)\n", " break\n", "\n", " numeric_items = [(k, as_number(v)) for k, v in props_norm.items()]\n", " numeric_items = [(k, v) for k, v in numeric_items if v is not None]\n", "\n", " dem_votes = None\n", " rep_votes = None\n", " vote_office_totals = {}\n", "\n", " for key, value in numeric_items:\n", " office_guess, party_guess = party_from_key(key)\n", " if party_guess == 'D':\n", " dem_votes = value if dem_votes is None else max(dem_votes, value)\n", " elif party_guess == 'R':\n", " rep_votes = value if rep_votes is None else max(rep_votes, value)\n", "\n", " if office_guess is not None and party_guess in {'D', 'R'}:\n", " vote_office_totals[office_guess] = vote_office_totals.get(office_guess, 0.0) + float(value)\n", "\n", " total_candidates = [\n", " v for k, v in numeric_items\n", " if (\n", " ('total' in k and 'vote' in k)\n", " or ('tot' in k and 'vote' in k)\n", " or k in {'votes_total', 'total_votes', 'vote_total'}\n", " )\n", " ]\n", " total_votes = max(total_candidates) if total_candidates else None\n", " if total_votes is None and dem_votes is not None and rep_votes is not None:\n", " total_votes = dem_votes + rep_votes\n", "\n", " turnout_candidates = [\n", " v for k, v in numeric_items\n", " if any(x in k for x in ['turnout', 'turnout_pct', 'turnout_rate', 'vote_share', 'share', 'pct'])\n", " ]\n", " turnout_or_vote_share = turnout_candidates[0] if turnout_candidates else None\n", "\n", " if turnout_or_vote_share is None:\n", " reg_voters = props_norm.get('reg_voters')\n", " reg_voters_num = as_number(reg_voters)\n", " if reg_voters_num and total_votes:\n", " turnout_or_vote_share = total_votes / reg_voters_num\n", " elif dem_votes is not None and rep_votes is not None and (dem_votes + rep_votes) > 0:\n", " turnout_or_vote_share = dem_votes / (dem_votes + rep_votes)\n", "\n", " office = detect_office(row['rdh_layer_title'], props_norm, vote_office_totals)\n", "\n", " return pd.Series({\n", " 'precinct_identifier_name': precinct_id_or_name,\n", " 'election_year': election_year,\n", " 'office': office,\n", " 'democratic_votes': dem_votes,\n", " 'republican_votes': rep_votes,\n", " 'total_votes': total_votes,\n", " 'turnout_or_vote_share': turnout_or_vote_share,\n", " })\n", "\n", "\n", "standardized_fields = raw_standardized.apply(extract_vote_fields, axis=1)\n", "standardized_preview = pd.concat(\n", " [\n", " raw_standardized[['master_id', 'name', 'city', 'state', 'rdh_layer_title']],\n", " standardized_fields,\n", " ],\n", " axis=1,\n", ")\n", "\n", "standardized_summary = pd.DataFrame({\n", " 'field': [\n", " 'precinct_identifier_name', 'election_year', 'office',\n", " 'democratic_votes', 'republican_votes', 'total_votes', 'turnout_or_vote_share',\n", " ]\n", "})\n", "standardized_summary['non_null_rows'] = standardized_summary['field'].map(\n", " lambda c: int(standardized_preview[c].notna().sum())\n", ")\n", "\n", "print(f'Standardized preview rows: {len(standardized_preview):,}')\n", "display(standardized_summary)\n", "display(standardized_preview.head(50))" ] }, { "cell_type": "code", "execution_count": null, "id": "30", "metadata": {}, "outputs": [], "source": [ "ELECTION_CONTEXT_TABLE = 'public.data_center_election_context'\n", "\n", "required_cols = [\n", " 'master_id', 'rdh_layer_title',\n", " 'precinct_identifier_name', 'election_year', 'office',\n", " 'democratic_votes', 'republican_votes', 'total_votes', 'turnout_or_vote_share',\n", "]\n", "\n", "missing_cols = [c for c in required_cols if c not in standardized_preview.columns]\n", "if missing_cols:\n", " raise RuntimeError(\n", " 'standardized_preview is missing required columns: '\n", " + ', '.join(missing_cols)\n", " + '. Run the standardized extraction cell first.'\n", " )\n", "\n", "persist_best = standardized_preview[required_cols].copy()\n", "persist_best['non_null_score'] = persist_best[\n", " ['precinct_identifier_name', 'election_year', 'office', 'democratic_votes', 'republican_votes', 'total_votes', 'turnout_or_vote_share']\n", "].notna().sum(axis=1)\n", "\n", "persist_best = persist_best.sort_values(\n", " ['master_id', 'non_null_score', 'total_votes'],\n", " ascending=[True, False, False],\n", " na_position='last'\n", ")\n", "persist_best = persist_best.drop_duplicates(subset=['master_id'], keep='first').copy()\n", "\n", "with get_conn() as conn:\n", " master_base = pd.read_sql_query(\n", " f'''\n", " select master_id, name, city, upper(state) as state\n", " from {MASTER_TABLE}\n", " ''',\n", " conn,\n", " )\n", "\n", "persist_df = master_base.merge(\n", " persist_best.drop(columns=['non_null_score']),\n", " on='master_id',\n", " how='left',\n", ")\n", "\n", "create_sql = f'''\n", "create table if not exists {ELECTION_CONTEXT_TABLE} (\n", " master_id text primary key references public.master_data_centers(master_id) on delete cascade,\n", " name text,\n", " city text,\n", " state text,\n", " rdh_layer_title text,\n", " precinct_identifier_name text,\n", " election_year integer,\n", " office text,\n", " democratic_votes double precision,\n", " republican_votes double precision,\n", " total_votes double precision,\n", " turnout_or_vote_share double precision,\n", " updated_at timestamptz not null default now()\n", ");\n", "create index if not exists data_center_election_context_state_idx\n", " on {ELECTION_CONTEXT_TABLE} (state);\n", "create index if not exists data_center_election_context_year_idx\n", " on {ELECTION_CONTEXT_TABLE} (election_year);\n", "'''\n", "\n", "upsert_sql = f'''\n", "insert into {ELECTION_CONTEXT_TABLE} (\n", " master_id, name, city, state, rdh_layer_title,\n", " precinct_identifier_name, election_year, office,\n", " democratic_votes, republican_votes, total_votes, turnout_or_vote_share,\n", " updated_at\n", ")\n", "values %s\n", "on conflict (master_id) do update set\n", " name = excluded.name,\n", " city = excluded.city,\n", " state = excluded.state,\n", " rdh_layer_title = excluded.rdh_layer_title,\n", " precinct_identifier_name = excluded.precinct_identifier_name,\n", " election_year = excluded.election_year,\n", " office = excluded.office,\n", " democratic_votes = excluded.democratic_votes,\n", " republican_votes = excluded.republican_votes,\n", " total_votes = excluded.total_votes,\n", " turnout_or_vote_share = excluded.turnout_or_vote_share,\n", " updated_at = now()\n", "'''\n", "\n", "rows = []\n", "for rec in persist_df.to_dict('records'):\n", " rows.append((\n", " rec['master_id'],\n", " rec['name'],\n", " rec['city'],\n", " rec['state'],\n", " rec.get('rdh_layer_title'),\n", " rec.get('precinct_identifier_name'),\n", " int(rec['election_year']) if pd.notna(rec.get('election_year')) else None,\n", " rec.get('office'),\n", " float(rec['democratic_votes']) if pd.notna(rec.get('democratic_votes')) else None,\n", " float(rec['republican_votes']) if pd.notna(rec.get('republican_votes')) else None,\n", " float(rec['total_votes']) if pd.notna(rec.get('total_votes')) else None,\n", " float(rec['turnout_or_vote_share']) if pd.notna(rec.get('turnout_or_vote_share')) else None,\n", " ))\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(create_sql)\n", " if rows:\n", " execute_values(\n", " cur,\n", " upsert_sql,\n", " rows,\n", " template='(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())',\n", " page_size=1000,\n", " )\n", " cur.execute(f'select count(*) from {ELECTION_CONTEXT_TABLE}')\n", " table_rows = cur.fetchone()[0]\n", " cur.execute(\n", " f'''\n", " select\n", " state,\n", " count(*) as rows,\n", " count(*) filter (\n", " where election_year is not null\n", " or office is not null\n", " or democratic_votes is not null\n", " or republican_votes is not null\n", " or total_votes is not null\n", " or turnout_or_vote_share is not null\n", " ) as rows_with_election\n", " from {ELECTION_CONTEXT_TABLE}\n", " group by state\n", " order by rows desc, state\n", " limit 15\n", " '''\n", " )\n", " state_counts = cur.fetchall()\n", "\n", "rows_with_election = int(\n", " persist_df[\n", " ['election_year', 'office', 'democratic_votes', 'republican_votes', 'total_votes', 'turnout_or_vote_share']\n", " ].notna().any(axis=1).sum()\n", ")\n", "print(f'Rows prepared for upsert: {len(rows):,}')\n", "print(f'Rows with election context: {rows_with_election:,}')\n", "print(f'Rows currently in {ELECTION_CONTEXT_TABLE}: {table_rows:,}')\n", "display(pd.DataFrame(state_counts, columns=['state', 'rows', 'rows_with_election']))" ] }, { "cell_type": "markdown", "id": "31", "metadata": {}, "source": [ "## Persist Standardized Election Context\n", "\n", "Writes one standardized election-context row per `master_id` into `public.data_center_election_context` for reuse in map and reporting workflows." ] }, { "cell_type": "code", "execution_count": null, "id": "32", "metadata": {}, "outputs": [], "source": [ "# Targeted state coverage check\n", "states = ['VA', 'WA', 'WI', 'WV', 'WY', 'DC', 'PR']\n", "\n", "with get_conn() as conn:\n", " check_df = pd.read_sql_query(\n", " f'''\n", " select\n", " state,\n", " count(*) as rows,\n", " count(*) filter (\n", " where election_year is not null\n", " or office is not null\n", " or democratic_votes is not null\n", " or republican_votes is not null\n", " or total_votes is not null\n", " or turnout_or_vote_share is not null\n", " ) as rows_with_election\n", " from {ELECTION_CONTEXT_TABLE}\n", " where state = any(%s)\n", " group by state\n", " order by state\n", " ''',\n", " conn,\n", " params=[states],\n", " )\n", "\n", "display(check_df)" ] }, { "cell_type": "markdown", "id": "33", "metadata": {}, "source": [ "## Tables Created by This Notebook and Their Relationships\n", "\n", "This notebook creates and/or maintains the following PostGIS/PostgreSQL tables:\n", "\n", "1. `public.rdh_precinct_vote_layers`\n", "- One row per RDH precinct-election layer ingested.\n", "- Key columns: `layer_id` (PK), `state_code`, `title`, `format`, file/source metadata, `loaded_at`.\n", "\n", "2. `public.rdh_precinct_vote_features`\n", "- One row per precinct polygon feature from a loaded layer.\n", "- Key columns: `feature_id` (PK), `layer_id` (FK), `state_code`, `source_row`, `properties` (JSONB), `geom` (MultiPolygon).\n", "- Relationship: many features belong to one layer.\n", "\n", "3. `public.data_center_rdh_precinct_vote_matches`\n", "- Spatial match table linking data centers to precinct features.\n", "- Key columns: `master_id` (FK), `feature_id` (FK), `layer_id` (FK), `state_code`, `join_method`, `match_distance_m`, `matched_at`.\n", "- Primary key: (`master_id`, `feature_id`).\n", "- Relationship: many-to-many bridge between data centers and precinct features (with match metadata).\n", "\n", "4. `public.data_center_election_context`\n", "- Final standardized, one-row-per-data-center election context used by downstream mapping/analysis.\n", "- Key columns: `master_id` (PK, FK), `name`, `city`, `state`, `rdh_layer_title`,\n", " `precinct_identifier_name`, `election_year`, `office`, `democratic_votes`, `republican_votes`,\n", " `total_votes`, `turnout_or_vote_share`, `updated_at`.\n", "- Relationship: one row per `master_id` in `public.master_data_centers` (left-joined so all master rows can be retained, even if election fields are null).\n", "\n", "### Relationship Summary\n", "\n", "- `public.master_data_centers (master_id)`\n", " - 1-to-many -> `public.data_center_rdh_precinct_vote_matches (master_id)`\n", " - 1-to-1 (effective in this notebook) -> `public.data_center_election_context (master_id)`\n", "\n", "- `public.rdh_precinct_vote_layers (layer_id)`\n", " - 1-to-many -> `public.rdh_precinct_vote_features (layer_id)`\n", " - 1-to-many -> `public.data_center_rdh_precinct_vote_matches (layer_id)`\n", "\n", "- `public.rdh_precinct_vote_features (feature_id)`\n", " - 1-to-many -> `public.data_center_rdh_precinct_vote_matches (feature_id)`\n", "\n", "In short: **layers -> features -> matches**, then matches are standardized into **one election-context row per data center**." ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.5" } }, "nbformat": 4, "nbformat_minor": 5 }