{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# RDH Precinct Vote Data for Master Data Centers\n", "\n", "This notebook discovers, downloads, stages, and spatially joins Redistricting Data Hub precinct-level election data to `public.master_data_centers`.\n", "\n", "It is designed to be rerunnable as new data centers are added:\n", "- Data-center locations come from `public.master_data_centers`.\n", "- RDH credentials are read from `RDH_USERNAME` / `RDH_PASSWORD` or prompted securely with `getpass`.\n", "- RDH files are downloaded into `data/rdh_precinct_vote/`.\n", "- Original precinct attributes are preserved in `properties jsonb` because RDH vote-column names vary by state/year/election.\n", "- Matches are written to `public.data_center_rdh_precinct_vote_matches`, joinable by `master_id`.\n", "\n", "Primary join method is point-in-precinct using longitude/latitude. Census tract context is included as a fallback/diagnostic path when the existing census tract table is available.\n", "\n", "Local RDH API reference used for this notebook:\n", "- `/home/dadams/Repos/api-redistricting_datahub/RDH_API.ipynb`\n", "- `/home/dadams/Repos/api-redistricting_datahub/RDH_API_SET_PARAMS.ipynb`\n", "- `/home/dadams/Repos/api-redistricting_datahub/Download_API.txt`\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "import hashlib\n", "import io\n", "import json\n", "import os\n", "import re\n", "import shutil\n", "import subprocess\n", "import time\n", "import zipfile\n", "from getpass import getpass\n", "from pathlib import Path\n", "from urllib.parse import parse_qs, urlparse, unquote\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import psycopg2\n", "import requests\n", "from psycopg2 import sql\n", "from psycopg2.extras import Json, execute_values\n", "\n", "try:\n", " import geopandas as gpd\n", " HAS_GEOPANDAS = True\n", "except ImportError:\n", " gpd = None\n", " HAS_GEOPANDAS = False\n", "\n", "pd.set_option('display.max_columns', 120)\n", "pd.set_option('display.max_rows', 120)\n", "\n", "print('pandas:', pd.__version__)\n", "print('requests:', requests.__version__)\n", "print('geopandas:', 'ok' if HAS_GEOPANDAS else 'not installed; spatial file loading cells will be skipped')\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2", "metadata": {}, "outputs": [], "source": [ "def load_env_file(env_path: str = '.env') -> None:\n", " p = Path(env_path)\n", " if not p.exists():\n", " print(f'No {env_path} file found in {Path.cwd()}')\n", " return\n", "\n", " loaded = 0\n", " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith('#') or '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " key = key.strip()\n", " value = value.strip().strip('\"').strip(\"'\")\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", " loaded += 1\n", " print(f'Loaded {loaded} env var(s) from {env_path}')\n", "\n", "\n", "def require_env(keys):\n", " missing = [k for k in keys if not os.getenv(k)]\n", " if missing:\n", " raise EnvironmentError(\n", " 'Missing required env vars in notebook kernel: ' + ', '.join(missing) +\n", " '.\\nSet them in this notebook, or add them to a .env file in this folder.'\n", " )\n", "\n", "\n", "load_env_file('.env')\n", "require_env(['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD'])\n", "\n", "DB_NAME = 'data_centers'\n", "MASTER_TABLE = 'public.master_data_centers'\n", "CENSUS_TRACT_TABLE = 'public.data_center_census_tracts_2024'\n", "\n", "LAYER_TABLE = 'public.rdh_precinct_vote_layers'\n", "FEATURE_TABLE = 'public.rdh_precinct_vote_features'\n", "MATCH_TABLE = 'public.data_center_rdh_precinct_vote_matches'\n", "\n", "\n", "def get_conn():\n", " return psycopg2.connect(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " password=os.environ['PGWEB_PASSWORD'],\n", " dbname=DB_NAME,\n", " )\n", "\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select current_database(), current_user')\n", " db, usr = cur.fetchone()\n", " print('Connected to DB:', db)\n", " print('As user:', usr)\n", " cur.execute('create extension if not exists postgis')\n", " cur.execute('select to_regclass(%s)', (MASTER_TABLE,))\n", " if cur.fetchone()[0] is None:\n", " raise RuntimeError(f'{MASTER_TABLE} does not exist. Run build_master_data_centers.py first.')\n", " cur.execute(sql.SQL('select count(*) from {}').format(sql.SQL(MASTER_TABLE)))\n", " print(f'{MASTER_TABLE} rows:', f'{cur.fetchone()[0]:,}')" ] }, { "cell_type": "markdown", "id": "3", "metadata": {}, "source": [ "## Parameters\n", "\n", "The defaults run a small real pilot for Virginia 2020, because Virginia has many data centers in the master table and a statewide precinct layer should produce visible matches. After the pilot works, broaden `TARGET_STATES` and `FILTER_YEARS_ANY`. Use `TARGET_STATES = None` to infer all states from `public.master_data_centers`.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4", "metadata": {}, "outputs": [], "source": [ "RDH_LIST_URL = 'https://redistrictingdatahub.org/wp-json/download/list'\n", "\n", "DATA_DIR = Path('data/rdh_precinct_vote')\n", "RAW_DIR = DATA_DIR / 'raw'\n", "EXTRACT_DIR = DATA_DIR / 'extracted'\n", "MANIFEST_PATH = DATA_DIR / 'rdh_precinct_vote_download_manifest.csv'\n", "LISTING_CACHE_PATH = DATA_DIR / 'rdh_precinct_vote_listing_cache.csv'\n", "\n", "TARGET_STATES = None # None = infer all states from master_data_centers; or list e.g. ['VA','TX']\n", "FILTER_TERMS_ALL = ['election results', 'precinct']\n", "FILTER_TERMS_ANY = [] # e.g. ['general', 'president']\n", "FILTER_YEARS_ANY = ['2020'] # pilot first; empty keeps all years returned by RDH\n", "PREFERRED_FORMATS = ['SHP'] # point-in-precinct joins need spatial files\n", "\n", "DOWNLOAD_FILES = True\n", "OVERWRITE_DOWNLOADS = False\n", "LOAD_TO_POSTGIS = True\n", "RUN_SPATIAL_MATCH = True\n", "RUN_NEAREST_PRECINCT_FALLBACK = True\n", "NEAREST_PRECINCT_MAX_DISTANCE_M = 500\n", "\n", "REQUEST_SLEEP_SECONDS = 1.0\n", "\n", "for p in [DATA_DIR, RAW_DIR, EXTRACT_DIR]:\n", " p.mkdir(parents=True, exist_ok=True)\n", "\n", "print('Data directory:', DATA_DIR.resolve())\n", "print('Download files:', DOWNLOAD_FILES)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "STATE_NAME_TO_CODE = {\n", " 'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',\n", " 'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',\n", " 'district of columbia': 'DC', 'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI',\n", " 'idaho': 'ID', 'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA',\n", " 'kansas': 'KS', 'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME',\n", " 'maryland': 'MD', 'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN',\n", " 'mississippi': 'MS', 'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE',\n", " 'nevada': 'NV', 'new hampshire': 'NH', 'new jersey': 'NJ', 'new mexico': 'NM',\n", " 'new york': 'NY', 'north carolina': 'NC', 'north dakota': 'ND', 'ohio': 'OH',\n", " 'oklahoma': 'OK', 'oregon': 'OR', 'pennsylvania': 'PA', 'rhode island': 'RI',\n", " 'south carolina': 'SC', 'south dakota': 'SD', 'tennessee': 'TN', 'texas': 'TX',\n", " 'utah': 'UT', 'vermont': 'VT', 'virginia': 'VA', 'washington': 'WA',\n", " 'west virginia': 'WV', 'wisconsin': 'WI', 'wyoming': 'WY',\n", "}\n", "STATE_CODE_TO_NAME = {v: k for k, v in STATE_NAME_TO_CODE.items()}\n", "\n", "\n", "def normalize_state_code(value):\n", " if pd.isna(value) or str(value).strip() == '':\n", " return None\n", " raw = str(value).strip()\n", " upper = raw.upper()\n", " if upper in STATE_CODE_TO_NAME:\n", " return upper\n", " return STATE_NAME_TO_CODE.get(raw.lower())\n", "\n", "\n", "def infer_target_states():\n", " if TARGET_STATES:\n", " return sorted({normalize_state_code(s) for s in TARGET_STATES if normalize_state_code(s)})\n", " query = f'''\n", " select state, count(*) as rows\n", " from {MASTER_TABLE}\n", " where geom is not null\n", " group by state\n", " order by state\n", " '''\n", " with get_conn() as conn:\n", " states = pd.read_sql_query(query, conn)\n", " states['state_code'] = states['state'].map(normalize_state_code)\n", " display(states)\n", " missing = states.loc[states['state_code'].isna() & states['state'].notna()]\n", " if not missing.empty:\n", " print('Warning: could not normalize these state values:')\n", " display(missing)\n", " return sorted(states['state_code'].dropna().unique())\n", "\n", "\n", "target_states = infer_target_states()\n", "print('Target state codes:', ', '.join(target_states))\n" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "## RDH Credentials" ] }, { "cell_type": "code", "execution_count": null, "id": "7", "metadata": {}, "outputs": [], "source": [ "RDH_USERNAME = os.getenv('RDH_USERNAME') or os.getenv('RDH_EMAIL')\n", "RDH_PASSWORD = os.getenv('RDH_PASSWORD')\n", "\n", "if not RDH_USERNAME:\n", " RDH_USERNAME = input('RDH username or email: ').strip()\n", "if not RDH_PASSWORD:\n", " RDH_PASSWORD = getpass('RDH password: ')\n", "\n", "if not RDH_USERNAME or not RDH_PASSWORD:\n", " raise RuntimeError('RDH credentials are required. Use prompts or set RDH_USERNAME/RDH_PASSWORD.')\n", "\n", "print('RDH username loaded:', RDH_USERNAME)\n", "print('RDH password loaded:', bool(RDH_PASSWORD))\n" ] }, { "cell_type": "markdown", "id": "8", "metadata": {}, "source": [ "## Discover RDH Datasets" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "def get_rdh_list_for_state(state_code):\n", " params = {\n", " 'username': RDH_USERNAME,\n", " 'password': RDH_PASSWORD,\n", " 'format': 'csv',\n", " 'states': STATE_CODE_TO_NAME.get(state_code, state_code).lower(),\n", " }\n", " response = requests.get(RDH_LIST_URL, params=params, timeout=120)\n", " response.raise_for_status()\n", " text = response.content.decode('utf-8', errors='replace')\n", " df = pd.read_csv(io.StringIO(text))\n", " df['query_state_code'] = state_code\n", " df['query_state_name'] = STATE_CODE_TO_NAME.get(state_code, state_code)\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", " return df\n", "\n", "\n", "def load_or_fetch_rdh_listing(refresh=False):\n", " cached = None\n", " if LISTING_CACHE_PATH.exists() and not refresh:\n", " cached = pd.read_csv(LISTING_CACHE_PATH)\n", " cached_states = set(cached.get('query_state_code', pd.Series(dtype=str)).dropna().unique())\n", " missing = [s for s in target_states if s not in cached_states]\n", " print(f'Cache has {len(cached):,} rows across {len(cached_states)} state(s); missing: {missing or \"none\"}')\n", " else:\n", " cached_states = set()\n", " missing = list(target_states)\n", "\n", " if missing:\n", " frames = [] if cached is None else [cached]\n", " for state_code in missing:\n", " print('Retrieving RDH listing for', state_code)\n", " frames.append(get_rdh_list_for_state(state_code))\n", " cached = pd.concat(frames, ignore_index=True)\n", " LISTING_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)\n", " cached.to_csv(LISTING_CACHE_PATH, index=False)\n", " print('Updated listing cache:', LISTING_CACHE_PATH)\n", "\n", " if cached is not None and 'query_state_code' in cached.columns:\n", " cached = cached[cached['query_state_code'].isin(target_states)].copy()\n", " return cached if cached is not None else pd.DataFrame()\n", "\n", "\n", "listing = load_or_fetch_rdh_listing(refresh=False)\n", "print(f'RDH listing rows for target states: {len(listing):,}')\n", "display(listing.head())\n", "display(pd.DataFrame({'column': listing.columns}))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "10", "metadata": {}, "outputs": [], "source": [ "def text_contains_all(text, terms):\n", " text = str(text).lower()\n", " return all(term.lower() in text for term in terms if term)\n", "\n", "\n", "def text_contains_any(text, terms):\n", " terms = [term for term in terms if term]\n", " if not terms:\n", " return True\n", " text = str(text).lower()\n", " return any(term.lower() in text for term in terms)\n", "\n", "\n", "def row_year_match(text):\n", " if not FILTER_YEARS_ANY:\n", " return True\n", " text = str(text)\n", " return any(str(year) in text for year in FILTER_YEARS_ANY)\n", "\n", "\n", "def parse_datasetid(url):\n", " if pd.isna(url):\n", " return None\n", " parsed = urlparse(str(url))\n", " qs = parse_qs(parsed.query)\n", " values = qs.get('datasetid')\n", " return values[0] if values else None\n", "\n", "\n", "def clean_filename_from_url(url, fmt):\n", " base = unquote(str(url).split('?')[0]).rstrip('/')\n", " name = base.split('/')[-1]\n", " if not name:\n", " name = 'rdh_download.zip'\n", " suffix = Path(name).suffix\n", " if not suffix:\n", " suffix = '.zip' if str(fmt).upper() == 'SHP' else ''\n", " name = name + suffix\n", " return re.sub(r'[^A-Za-z0-9._-]+', '_', name)\n", "\n", "\n", "work = listing.copy()\n", "for required in ['Title', 'Format', 'URL']:\n", " if required not in work.columns:\n", " raise RuntimeError(f'RDH listing is missing expected column: {required}')\n", "\n", "work['title_format'] = work['Title'].fillna('').astype(str) + ' ' + work['Format'].fillna('').astype(str)\n", "work['datasetid'] = work['URL'].map(parse_datasetid)\n", "work['format_norm'] = work['Format'].fillna('').astype(str).str.upper()\n", "work['filename'] = work.apply(lambda r: clean_filename_from_url(r['URL'], r['Format']), axis=1)\n", "\n", "filtered = work[\n", " work['title_format'].map(lambda x: text_contains_all(x, FILTER_TERMS_ALL))\n", " & work['title_format'].map(lambda x: text_contains_any(x, FILTER_TERMS_ANY))\n", " & work['title_format'].map(row_year_match)\n", " & work['format_norm'].isin(PREFERRED_FORMATS)\n", "].copy()\n", "\n", "filtered = filtered.sort_values(['query_state_code', 'Title', 'Format', 'filename']).reset_index(drop=True)\n", "print(f'Filtered candidate files: {len(filtered):,}')\n", "display(filtered[['query_state_code', 'Title', 'Format', 'datasetid', 'filename', 'URL']].head(100))\n" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "## Download Candidate Files" ] }, { "cell_type": "code", "execution_count": null, "id": "12", "metadata": {}, "outputs": [], "source": [ "def layer_id_for_row(row):\n", " key = '|'.join([\n", " str(row.get('query_state_code', '')),\n", " str(row.get('Title', '')),\n", " str(row.get('Format', '')),\n", " str(row.get('datasetid', '')),\n", " str(row.get('URL', '')),\n", " ])\n", " return hashlib.sha1(key.encode('utf-8')).hexdigest()[:16]\n", "\n", "\n", "def download_rdh_file(row):\n", " base_url = str(row['URL']).split('?')[0]\n", " params = {\n", " 'username': RDH_USERNAME,\n", " 'password': RDH_PASSWORD,\n", " }\n", " if row.get('datasetid'):\n", " params['datasetid'] = row['datasetid']\n", "\n", " state_dir = RAW_DIR / row['query_state_code']\n", " state_dir.mkdir(parents=True, exist_ok=True)\n", " out_path = state_dir / row['filename']\n", " if out_path.exists() and not OVERWRITE_DOWNLOADS:\n", " return out_path, 'exists'\n", "\n", " response = requests.get(base_url, params=params, timeout=300)\n", " response.raise_for_status()\n", " out_path.write_bytes(response.content)\n", " time.sleep(REQUEST_SLEEP_SECONDS)\n", " return out_path, 'downloaded'\n", "\n", "\n", "download_rows = []\n", "for row in filtered.to_dict('records'):\n", " layer_id = layer_id_for_row(row)\n", " out_path = RAW_DIR / row['query_state_code'] / row['filename']\n", " status = 'planned'\n", " if DOWNLOAD_FILES:\n", " print('Retrieving', row['query_state_code'], row['Title'], row['filename'])\n", " out_path, status = download_rdh_file(row)\n", " elif out_path.exists():\n", " status = 'exists'\n", " download_rows.append({\n", " 'layer_id': layer_id,\n", " 'state_code': row['query_state_code'],\n", " 'title': row['Title'],\n", " 'format': row['Format'],\n", " 'datasetid': row.get('datasetid'),\n", " 'source_url': row['URL'],\n", " 'filename': row['filename'],\n", " 'local_path': str(out_path),\n", " 'download_status': status,\n", " })\n", "\n", "manifest = pd.DataFrame(download_rows)\n", "manifest.to_csv(MANIFEST_PATH, index=False)\n", "print('Wrote manifest:', MANIFEST_PATH)\n", "display(manifest.head(100))\n", "display(manifest['download_status'].value_counts(dropna=False).rename_axis('status').reset_index(name='rows'))\n", "\n", "if not DOWNLOAD_FILES and not manifest['local_path'].map(lambda p: Path(str(p)).exists()).any():\n", " print('No RDH shapefiles have been downloaded yet. Review the candidates above, then set DOWNLOAD_FILES=True and rerun this cell and the cells below.')\n" ] }, { "cell_type": "markdown", "id": "13", "metadata": {}, "source": [ "## Unpack and Find Spatial Files" ] }, { "cell_type": "code", "execution_count": null, "id": "14", "metadata": {}, "outputs": [], "source": [ "def extract_archive(path, layer_id):\n", " path = Path(path)\n", " extract_to = EXTRACT_DIR / layer_id\n", " extract_to.mkdir(parents=True, exist_ok=True)\n", " if path.suffix.lower() == '.zip':\n", " with zipfile.ZipFile(path) as zf:\n", " zf.extractall(extract_to)\n", " else:\n", " shutil.copy2(path, extract_to / path.name)\n", " return extract_to\n", "\n", "\n", "def find_spatial_file(folder):\n", " folder = Path(folder)\n", " candidates = (\n", " list(folder.rglob('*.shp')) +\n", " list(folder.rglob('*.geojson')) +\n", " list(folder.rglob('*.gpkg'))\n", " )\n", " if not candidates:\n", " return None\n", " candidates = sorted(candidates, key=lambda p: (p.suffix.lower() != '.shp', len(str(p))))\n", " return candidates[0]\n", "\n", "\n", "if MANIFEST_PATH.exists():\n", " manifest = pd.read_csv(MANIFEST_PATH)\n", "\n", "spatial_rows = []\n", "for row in manifest.to_dict('records'):\n", " local_path = Path(str(row['local_path']))\n", " if not local_path.exists():\n", " spatial_rows.append({**row, 'extract_dir': None, 'spatial_path': None, 'spatial_status': 'missing_download'})\n", " continue\n", " extract_dir = extract_archive(local_path, row['layer_id'])\n", " spatial_path = find_spatial_file(extract_dir)\n", " spatial_rows.append({\n", " **row,\n", " 'extract_dir': str(extract_dir),\n", " 'spatial_path': str(spatial_path) if spatial_path else None,\n", " 'spatial_status': 'found' if spatial_path else 'no_spatial_file',\n", " })\n", "\n", "spatial_manifest = pd.DataFrame(spatial_rows)\n", "spatial_manifest.to_csv(DATA_DIR / 'rdh_precinct_vote_spatial_manifest.csv', index=False)\n", "display(spatial_manifest[['state_code', 'title', 'filename', 'spatial_status', 'spatial_path']].head(100))\n", "display(spatial_manifest['spatial_status'].value_counts(dropna=False).rename_axis('status').reset_index(name='rows'))\n", "\n", "if not spatial_manifest['spatial_status'].eq('found').any():\n", " print('No spatial files were found. If spatial_status is missing_download, set DOWNLOAD_FILES=True and rerun the download/unpack cells.')\n" ] }, { "cell_type": "markdown", "id": "15", "metadata": {}, "source": [ "## Create PostGIS Staging Tables" ] }, { "cell_type": "code", "execution_count": null, "id": "16", "metadata": {}, "outputs": [], "source": [ "CREATE_TABLES_SQL = f'''\n", "create table if not exists {LAYER_TABLE} (\n", " layer_id text primary key,\n", " state_code text,\n", " title text,\n", " format text,\n", " datasetid text,\n", " source_url text,\n", " filename text,\n", " local_path text,\n", " spatial_path text,\n", " metadata jsonb,\n", " loaded_at timestamptz not null default now()\n", ");\n", "\n", "create table if not exists {FEATURE_TABLE} (\n", " feature_id text primary key,\n", " layer_id text not null references public.rdh_precinct_vote_layers(layer_id) on delete cascade,\n", " state_code text,\n", " source_row integer,\n", " properties jsonb,\n", " geom geometry(MultiPolygon, 4326)\n", ");\n", "\n", "create index if not exists rdh_precinct_vote_features_geom_gix\n", " on {FEATURE_TABLE} using gist (geom);\n", "create index if not exists rdh_precinct_vote_features_layer_idx\n", " on {FEATURE_TABLE} (layer_id);\n", "create index if not exists rdh_precinct_vote_features_state_idx\n", " on {FEATURE_TABLE} (state_code);\n", "\n", "create table if not exists {MATCH_TABLE} (\n", " master_id text not null references public.master_data_centers(master_id) on delete cascade,\n", " feature_id text not null references public.rdh_precinct_vote_features(feature_id) on delete cascade,\n", " layer_id text not null references public.rdh_precinct_vote_layers(layer_id) on delete cascade,\n", " state_code text,\n", " join_method text not null,\n", " match_distance_m double precision,\n", " matched_at timestamptz not null default now(),\n", " primary key (master_id, feature_id)\n", ");\n", "create index if not exists data_center_rdh_precinct_vote_matches_master_idx\n", " on {MATCH_TABLE} (master_id);\n", "create index if not exists data_center_rdh_precinct_vote_matches_layer_idx\n", " on {MATCH_TABLE} (layer_id);\n", "'''\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(CREATE_TABLES_SQL)\n", " print('PostGIS staging tables are ready.')\n" ] }, { "cell_type": "markdown", "id": "17", "metadata": {}, "source": [ "## Load Precinct Geometries to PostGIS" ] }, { "cell_type": "code", "execution_count": null, "id": "18", "metadata": {}, "outputs": [], "source": [ "def json_safe(value):\n", " if isinstance(value, (pd.Timestamp, np.datetime64)):\n", " return str(value)\n", " if isinstance(value, np.generic):\n", " return value.item()\n", " if pd.isna(value):\n", " return None\n", " return value\n", "\n", "\n", "def stable_feature_id(layer_id, source_row, geom):\n", " geom_hash = hashlib.sha1(geom.wkb).hexdigest()[:16] if geom is not None else 'nogeom'\n", " return hashlib.sha1(f'{layer_id}|{source_row}|{geom_hash}'.encode('utf-8')).hexdigest()[:24]\n", "\n", "\n", "def load_layer_to_postgis_with_ogr(row):\n", " spatial_path = row.get('spatial_path')\n", " if not spatial_path or not Path(spatial_path).exists():\n", " return {'layer_id': row['layer_id'], 'status': 'missing_spatial_file', 'features': 0}\n", "\n", " ogr2ogr_path = shutil.which('ogr2ogr')\n", " if not ogr2ogr_path:\n", " return {'layer_id': row['layer_id'], 'status': 'missing_geopandas_and_ogr2ogr', 'features': 0}\n", "\n", " staging_table = '_rdh_import_' + re.sub(r'[^a-zA-Z0-9_]', '_', row['layer_id']).lower()\n", " dsn = 'PG:host={host} port={port} user={user} dbname={dbname}'.format(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " dbname=DB_NAME,\n", " )\n", " env = os.environ.copy()\n", " env['PGPASSWORD'] = os.environ['PGWEB_PASSWORD']\n", " cmd = [\n", " ogr2ogr_path,\n", " '-f', 'PostgreSQL',\n", " dsn,\n", " str(spatial_path),\n", " '-nln', f'public.{staging_table}',\n", " '-overwrite',\n", " '-t_srs', 'EPSG:4326',\n", " '-nlt', 'PROMOTE_TO_MULTI',\n", " '-dim', 'XY',\n", " '-lco', 'GEOMETRY_NAME=geom',\n", " ]\n", " result = subprocess.run(cmd, env=env, capture_output=True, text=True)\n", " if result.returncode != 0:\n", " return {\n", " 'layer_id': row['layer_id'],\n", " 'status': 'ogr2ogr_failed',\n", " 'features': 0,\n", " 'error': (result.stderr or result.stdout)[-1000:],\n", " }\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('''\n", " select column_name\n", " from information_schema.columns\n", " where table_schema = 'public' and table_name = %s\n", " order by ordinal_position\n", " ''', (staging_table,))\n", " columns = [r[0] for r in cur.fetchall()]\n", " geom_col = 'geom' if 'geom' in columns else 'wkb_geometry'\n", " order_col = 'ogc_fid' if 'ogc_fid' in columns else 'ctid'\n", "\n", " metadata = {\n", " 'columns': columns,\n", " 'source_spatial_path': str(spatial_path),\n", " 'loader': 'ogr2ogr',\n", " }\n", " cur.execute(sql.SQL('''\n", " insert into {layers} (\n", " layer_id, state_code, title, format, datasetid, source_url,\n", " filename, local_path, spatial_path, metadata, loaded_at\n", " ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())\n", " on conflict (layer_id) do update set\n", " state_code = excluded.state_code,\n", " title = excluded.title,\n", " format = excluded.format,\n", " datasetid = excluded.datasetid,\n", " source_url = excluded.source_url,\n", " filename = excluded.filename,\n", " local_path = excluded.local_path,\n", " spatial_path = excluded.spatial_path,\n", " metadata = excluded.metadata,\n", " loaded_at = now()\n", " ''').format(layers=sql.SQL(LAYER_TABLE)), (\n", " row['layer_id'], row['state_code'], row['title'], row['format'],\n", " row.get('datasetid'), row['source_url'], row['filename'],\n", " row['local_path'], spatial_path, Json(metadata)\n", " ))\n", " cur.execute(sql.SQL('delete from {} where layer_id = %s').format(sql.SQL(FEATURE_TABLE)), (row['layer_id'],))\n", " insert_sql = sql.SQL('''\n", " insert into {features} (feature_id, layer_id, state_code, source_row, properties, geom)\n", " with src as (\n", " select\n", " row_number() over (order by {order_col})::integer as source_row,\n", " to_jsonb(t) - %s - 'ogc_fid' - 'fid' as properties,\n", " ST_Force2D({geom_col}) as geom\n", " from {staging} t\n", " where {geom_col} is not null\n", " ), fixed as (\n", " select\n", " source_row,\n", " properties,\n", " (ST_Dump(ST_Multi(ST_CollectionExtract(ST_MakeValid(geom), 3)))).geom as geom\n", " from src\n", " )\n", " select\n", " %s || '/' || source_row::text,\n", " %s,\n", " %s,\n", " source_row,\n", " properties,\n", " ST_Multi(ST_Force2D(ST_SetSRID(geom, 4326)))::geometry(MultiPolygon, 4326)\n", " from fixed\n", " where geom is not null and not ST_IsEmpty(geom)\n", " ''').format(\n", " features=sql.SQL(FEATURE_TABLE),\n", " staging=sql.Identifier('public', staging_table),\n", " geom_col=sql.Identifier(geom_col),\n", " order_col=sql.SQL(order_col) if order_col == 'ctid' else sql.Identifier(order_col),\n", " )\n", " cur.execute(insert_sql, (geom_col, row['layer_id'], row['layer_id'], row['state_code']))\n", " inserted = cur.rowcount\n", " cur.execute(sql.SQL('drop table if exists {}').format(sql.Identifier('public', staging_table)))\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(FEATURE_TABLE)))\n", " return {'layer_id': row['layer_id'], 'status': 'loaded_ogr2ogr', 'features': inserted}\n", "\n", "\n", "def load_layer_to_postgis(row):\n", " if not HAS_GEOPANDAS:\n", " return load_layer_to_postgis_with_ogr(row)\n", " spatial_path = row.get('spatial_path')\n", " if not spatial_path or not Path(spatial_path).exists():\n", " return {'layer_id': row['layer_id'], 'status': 'missing_spatial_file', 'features': 0}\n", "\n", " gdf = gpd.read_file(spatial_path)\n", " if gdf.empty:\n", " return {'layer_id': row['layer_id'], 'status': 'empty_file', 'features': 0}\n", " if gdf.crs is None:\n", " print(f'Warning: {spatial_path} has no CRS; assuming EPSG:4326.')\n", " gdf = gdf.set_crs(4326)\n", " else:\n", " gdf = gdf.to_crs(4326)\n", "\n", " gdf = gdf[gdf.geometry.notna()].copy()\n", " gdf = gdf[~gdf.geometry.is_empty].copy()\n", " if gdf.empty:\n", " return {'layer_id': row['layer_id'], 'status': 'no_valid_geometry', 'features': 0}\n", "\n", " layer_metadata = {\n", " 'columns': [str(c) for c in gdf.columns],\n", " 'row_count': int(len(gdf)),\n", " 'source_spatial_path': spatial_path,\n", " }\n", "\n", " feature_rows = []\n", " prop_cols = [c for c in gdf.columns if c != gdf.geometry.name]\n", " for source_row, (_, record) in enumerate(gdf.iterrows(), start=1):\n", " geom = record.geometry\n", " if geom is None or geom.is_empty:\n", " continue\n", " properties = {str(col): json_safe(record[col]) for col in prop_cols}\n", " feature_rows.append((\n", " stable_feature_id(row['layer_id'], source_row, geom),\n", " row['layer_id'],\n", " row['state_code'],\n", " source_row,\n", " Json(properties),\n", " json.dumps(geom.__geo_interface__),\n", " ))\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(sql.SQL('''\n", " insert into {layers} (\n", " layer_id, state_code, title, format, datasetid, source_url,\n", " filename, local_path, spatial_path, metadata, loaded_at\n", " ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())\n", " on conflict (layer_id) do update set\n", " state_code = excluded.state_code,\n", " title = excluded.title,\n", " format = excluded.format,\n", " datasetid = excluded.datasetid,\n", " source_url = excluded.source_url,\n", " filename = excluded.filename,\n", " local_path = excluded.local_path,\n", " spatial_path = excluded.spatial_path,\n", " metadata = excluded.metadata,\n", " loaded_at = now()\n", " ''').format(layers=sql.SQL(LAYER_TABLE)), (\n", " row['layer_id'], row['state_code'], row['title'], row['format'],\n", " row.get('datasetid'), row['source_url'], row['filename'],\n", " row['local_path'], spatial_path, Json(layer_metadata)\n", " ))\n", " cur.execute(sql.SQL('delete from {} where layer_id = %s').format(sql.SQL(FEATURE_TABLE)), (row['layer_id'],))\n", " if feature_rows:\n", " execute_values(\n", " cur,\n", " sql.SQL('''\n", " insert into {features} (\n", " feature_id, layer_id, state_code, source_row, properties, geom\n", " ) values %s\n", " ''').format(features=sql.SQL(FEATURE_TABLE)).as_string(conn),\n", " feature_rows,\n", " template='(%s, %s, %s, %s, %s, ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Force2D(ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326))), 3)))',\n", " page_size=1000,\n", " )\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(FEATURE_TABLE)))\n", " return {'layer_id': row['layer_id'], 'status': 'loaded', 'features': len(feature_rows)}\n", "\n", "\n", "SKIP_ALREADY_LOADED_LAYERS = True\n", "\n", "load_results = []\n", "if LOAD_TO_POSTGIS:\n", " found = spatial_manifest[spatial_manifest['spatial_status'].eq('found')].copy()\n", " already_loaded = set()\n", " if SKIP_ALREADY_LOADED_LAYERS:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(f'select layer_id from {LAYER_TABLE}')\n", " already_loaded = {r[0] for r in cur.fetchall()}\n", " print(f'Skipping {len(already_loaded)} layer(s) already in PostGIS')\n", " for row in found.to_dict('records'):\n", " if row['layer_id'] in already_loaded:\n", " load_results.append({'layer_id': row['layer_id'], 'status': 'already_loaded', 'features': None})\n", " continue\n", " print('Loading layer:', row['state_code'], row['title'])\n", " load_results.append(load_layer_to_postgis(row))\n", "else:\n", " print('LOAD_TO_POSTGIS=False; skipping spatial file load.')\n", "\n", "load_results = pd.DataFrame(load_results)\n", "display(load_results)\n" ] }, { "cell_type": "markdown", "id": "19", "metadata": {}, "source": [ "## Match Data Centers to Precinct Layers" ] }, { "cell_type": "code", "execution_count": null, "id": "20", "metadata": {}, "outputs": [], "source": [ "MATCH_SQL = f'''\n", "insert into {MATCH_TABLE} (\n", " master_id, feature_id, layer_id, state_code, join_method, match_distance_m, matched_at\n", ")\n", "select\n", " m.master_id,\n", " f.feature_id,\n", " f.layer_id,\n", " f.state_code,\n", " 'point_in_precinct' as join_method,\n", " 0.0 as match_distance_m,\n", " now()\n", "from {MASTER_TABLE} m\n", "join {FEATURE_TABLE} f\n", " on m.geom && f.geom\n", " and ST_Covers(f.geom, m.geom)\n", "where m.geom is not null\n", "on conflict (master_id, feature_id) do update set\n", " join_method = excluded.join_method,\n", " match_distance_m = excluded.match_distance_m,\n", " matched_at = now()\n", "'''\n", "\n", "if RUN_SPATIAL_MATCH:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(MATCH_SQL)\n", " print('Inserted/updated matches:', cur.rowcount)\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(MATCH_TABLE)))\n", "else:\n", " print('RUN_SPATIAL_MATCH=False; skipping point-in-precinct matching.')\n" ] }, { "cell_type": "markdown", "id": "21", "metadata": {}, "source": [ "## Match Diagnostics" ] }, { "cell_type": "code", "execution_count": null, "id": "22", "metadata": {}, "outputs": [], "source": [ "diagnostics_sql = f'''\n", "with dc as (\n", " select state, count(*) as data_centers, ST_Extent(geom) as dc_extent\n", " from {MASTER_TABLE}\n", " where geom is not null\n", " group by state\n", "), feats as (\n", " select state_code as state, count(*) as precinct_features, ST_Extent(geom) as precinct_extent\n", " from {FEATURE_TABLE}\n", " where geom is not null\n", " group by state_code\n", "), matches as (\n", " select state_code as state, count(distinct master_id) as matched_data_centers\n", " from {MATCH_TABLE}\n", " group by state_code\n", ")\n", "select\n", " coalesce(dc.state, feats.state, matches.state) as state,\n", " coalesce(dc.data_centers, 0) as data_centers,\n", " coalesce(feats.precinct_features, 0) as precinct_features,\n", " coalesce(matches.matched_data_centers, 0) as matched_data_centers,\n", " dc.dc_extent::text as data_center_extent,\n", " feats.precinct_extent::text as precinct_extent\n", "from dc\n", "full join feats on feats.state = dc.state\n", "full join matches on matches.state = coalesce(dc.state, feats.state)\n", "order by state\n", "'''\n", "\n", "nearest_sql = f'''\n", "select\n", " m.master_id,\n", " m.name,\n", " m.city,\n", " m.state,\n", " nearest.layer_id,\n", " nearest.feature_id,\n", " nearest.distance_m\n", "from {MASTER_TABLE} m\n", "left join {MATCH_TABLE} matched on matched.master_id = m.master_id\n", "left join lateral (\n", " select\n", " f.layer_id,\n", " f.feature_id,\n", " ST_Distance(m.geom::geography, f.geom::geography) as distance_m\n", " from {FEATURE_TABLE} f\n", " where f.geom is not null\n", " order by m.geom <-> f.geom\n", " limit 1\n", ") nearest on true\n", "where m.geom is not null\n", " and matched.master_id is null\n", "order by nearest.distance_m nulls last\n", "limit 50\n", "'''\n", "\n", "with get_conn() as conn:\n", " diagnostics = pd.read_sql_query(diagnostics_sql, conn)\n", " nearest_unmatched = pd.read_sql_query(nearest_sql, conn)\n", "\n", "display(diagnostics)\n", "display(nearest_unmatched)\n", "\n", "if diagnostics['precinct_features'].sum() == 0:\n", " print('PostGIS has zero RDH precinct features loaded. Run the download, unpack, and load cells before matching.')\n", "elif diagnostics['matched_data_centers'].sum() == 0:\n", " print('Precinct features exist, but no point-in-polygon matches were found. Check the extents and nearest unmatched distances above.')\n" ] }, { "cell_type": "markdown", "id": "23", "metadata": {}, "source": [ "## Optional Nearest-Precinct Fallback" ] }, { "cell_type": "code", "execution_count": null, "id": "24", "metadata": {}, "outputs": [], "source": [ "NEAREST_FALLBACK_SQL = f'''\n", "insert into {MATCH_TABLE} (\n", " master_id, feature_id, layer_id, state_code, join_method, match_distance_m, matched_at\n", ")\n", "with candidates as (\n", " select m.master_id, m.geom, upper(m.state) as state_code\n", " from {MASTER_TABLE} m\n", " where m.geom is not null\n", " and m.state is not null\n", " and upper(m.state) in (select distinct state_code from {FEATURE_TABLE} where geom is not null)\n", " and not exists (\n", " select 1 from {MATCH_TABLE} existing where existing.master_id = m.master_id\n", " )\n", ")\n", "select\n", " c.master_id,\n", " nearest.feature_id,\n", " nearest.layer_id,\n", " nearest.state_code,\n", " 'nearest_precinct_within_threshold' as join_method,\n", " nearest.distance_m,\n", " now()\n", "from candidates c\n", "join lateral (\n", " select\n", " f.feature_id,\n", " f.layer_id,\n", " f.state_code,\n", " ST_Distance(c.geom::geography, f.geom::geography) as distance_m\n", " from {FEATURE_TABLE} f\n", " where f.geom is not null\n", " and f.state_code = c.state_code\n", " order by c.geom <-> f.geom\n", " limit 1\n", ") nearest on true\n", "where nearest.distance_m <= %s\n", "on conflict (master_id, feature_id) do update set\n", " join_method = excluded.join_method,\n", " match_distance_m = excluded.match_distance_m,\n", " matched_at = now()\n", "'''\n", "\n", "if RUN_SPATIAL_MATCH and RUN_NEAREST_PRECINCT_FALLBACK:\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(NEAREST_FALLBACK_SQL, (NEAREST_PRECINCT_MAX_DISTANCE_M,))\n", " print('Inserted/updated nearest fallback matches:', cur.rowcount)\n", " cur.execute(sql.SQL('analyze {}').format(sql.SQL(MATCH_TABLE)))\n", "else:\n", " print('Nearest fallback disabled.')\n" ] }, { "cell_type": "markdown", "id": "25", "metadata": {}, "source": [ "## Inspect Matches and Census-Tract Context" ] }, { "cell_type": "code", "execution_count": null, "id": "26", "metadata": {}, "outputs": [], "source": [ "match_summary_sql = f'''\n", "select\n", " l.state_code,\n", " l.title,\n", " count(distinct f.feature_id) as precinct_features,\n", " count(distinct m.master_id) as matched_data_centers\n", "from {LAYER_TABLE} l\n", "left join {FEATURE_TABLE} f on f.layer_id = l.layer_id\n", "left join {MATCH_TABLE} m on m.feature_id = f.feature_id\n", "group by l.state_code, l.title\n", "order by l.state_code, matched_data_centers desc, l.title\n", "'''\n", "\n", "sample_sql = f'''\n", "select\n", " m.master_id,\n", " dc.name,\n", " dc.city,\n", " dc.state,\n", " l.title as rdh_layer_title,\n", " f.properties as precinct_properties\n", "from {MATCH_TABLE} m\n", "join {MASTER_TABLE} dc on dc.master_id = m.master_id\n", "join {FEATURE_TABLE} f on f.feature_id = m.feature_id\n", "join {LAYER_TABLE} l on l.layer_id = m.layer_id\n", "order by dc.state, dc.city, dc.name\n", "limit 25\n", "'''\n", "\n", "with get_conn() as conn:\n", " match_summary = pd.read_sql_query(match_summary_sql, conn)\n", " sample_matches = pd.read_sql_query(sample_sql, conn)\n", "\n", "display(match_summary)\n", "display(sample_matches)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "27", "metadata": {}, "outputs": [], "source": [ "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select to_regclass(%s)', (CENSUS_TRACT_TABLE,))\n", " has_census_tract_table = cur.fetchone()[0] is not None\n", "\n", "if has_census_tract_table:\n", " tract_context_sql = f'''\n", " select\n", " dc.master_id,\n", " dc.name,\n", " dc.city,\n", " dc.state,\n", " dc.geoid as census_tract_geoid,\n", " count(pm.feature_id) as precinct_layer_matches\n", " from {MASTER_TABLE} dc\n", " left join {CENSUS_TRACT_TABLE} ct on ct.geoid = dc.geoid\n", " left join {MATCH_TABLE} pm on pm.master_id = dc.master_id\n", " group by dc.master_id, dc.name, dc.city, dc.state, ct.geoid\n", " order by precinct_layer_matches asc, dc.state, dc.city\n", " limit 50\n", " '''\n", " with get_conn() as conn:\n", " tract_context = pd.read_sql_query(tract_context_sql, conn)\n", " display(tract_context)\n", "else:\n", " print(f'{CENSUS_TRACT_TABLE} not found; census tract fallback/context skipped.')\n" ] }, { "cell_type": "markdown", "id": "28", "metadata": {}, "source": [ "## Next Refinement: Tidy Vote Columns\n", "\n", "The RDH staging table intentionally stores each precinct row's original attributes in `properties jsonb`. Once the downloaded layers are visible, inspect `precinct_properties` above to identify vote-column patterns for the states/years you care about.\n", "\n", "Useful follow-up views can then extract fields like:\n", "- precinct identifier/name\n", "- election year\n", "- office\n", "- Democratic votes\n", "- Republican votes\n", "- total votes\n", "- turnout or vote share\n", "\n", "That extraction is best added after confirming the specific RDH file families selected by the filters.\n" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.5" } }, "nbformat": 4, "nbformat_minor": 5 }