Files
data-centers/postgis_table_loader.ipynb

588 lines
22 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"id": "0",
"metadata": {},
"source": [
"# PostGIS Table Loader Notebook\n",
"\n",
"Use this notebook to load additional tabular files into the same PostgreSQL/PostGIS database used by the scripts in this folder.\n",
"\n",
"Expected environment variables (same pattern as your .py files):\n",
"- `PGWEB_HOST`, `PGWEB_PORT`, `PGWEB_USER`, `PGWEB_PASSWORD`\n",
"- Optional override: `PGDATABASE` (defaults to `data_centers`)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from pathlib import Path\n",
"from typing import List, Tuple\n",
"\n",
"import pandas as pd\n",
"import psycopg2\n",
"from psycopg2 import sql\n",
"from psycopg2.extras import execute_values\n",
"\n",
"print('pandas:', pd.__version__)\n",
"print('psycopg2 loaded successfully')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"# Load DB env vars for notebook kernels (which often do not inherit shell exports).\n",
"def load_env_file(env_path: str = '.env') -> None:\n",
" p = Path(env_path)\n",
" if not p.exists():\n",
" print(f'No {env_path} file found in {Path.cwd()}')\n",
" return\n",
"\n",
" loaded = 0\n",
" for raw_line in p.read_text(encoding='utf-8').splitlines():\n",
" line = raw_line.strip()\n",
" if not line or line.startswith('#') or '=' not in line:\n",
" continue\n",
" key, value = line.split('=', 1)\n",
" key = key.strip()\n",
" value = value.strip().strip('\"').strip(\"'\")\n",
" if key and key not in os.environ:\n",
" os.environ[key] = value\n",
" loaded += 1\n",
" print(f'Loaded {loaded} env var(s) from {env_path}')\n",
"\n",
"\n",
"def require_env(keys):\n",
" missing = [k for k in keys if not os.getenv(k)]\n",
" if missing:\n",
" raise EnvironmentError(\n",
" 'Missing required env vars in notebook kernel: ' + ', '.join(missing) +\n",
" '.\\nSet them in this notebook, or add them to a .env file in this folder.'\n",
" )\n",
"\n",
"\n",
"load_env_file('.env')\n",
"print('PGWEB_HOST:', os.getenv('PGWEB_HOST', '<not set>'))\n",
"print('PGWEB_PORT:', os.getenv('PGWEB_PORT', '<not set>'))\n",
"print('PGWEB_USER:', os.getenv('PGWEB_USER', '<not set>'))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"# Connection setup: mirrors the existing scripts in this repository.\n",
"required_keys = ['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD']\n",
"require_env(required_keys)\n",
"\n",
"DB_NAME = os.getenv('PGDATABASE', 'data_centers')\n",
"\n",
"def get_conn():\n",
" return psycopg2.connect(\n",
" host=os.environ['PGWEB_HOST'],\n",
" port=os.environ['PGWEB_PORT'],\n",
" user=os.environ['PGWEB_USER'],\n",
" password=os.environ['PGWEB_PASSWORD'],\n",
" dbname=\"data_centers\",\n",
" )\n",
"\n",
"with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute('select current_database(), current_user, version()')\n",
" db, usr, ver = cur.fetchone()\n",
" print('Connected to DB:', db)\n",
" print('As user:', usr)\n",
" print('Postgres:', ver.split(',')[0])\n",
" cur.execute('create extension if not exists postgis')\n",
" print('PostGIS extension is available')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"def parse_table_name(table_fqn: str) -> Tuple[str, str]:\n",
" table_fqn = table_fqn.strip()\n",
" if '.' in table_fqn:\n",
" schema, table = table_fqn.split('.', 1)\n",
" else:\n",
" schema, table = 'public', table_fqn\n",
" return schema, table\n",
"\n",
"\n",
"def clean_column_name(name: str) -> str:\n",
" s = str(name).strip().lower()\n",
" out = []\n",
" last_was_us = False\n",
" for ch in s:\n",
" keep = ch.isalnum() or ch == '_'\n",
" c = ch if keep else '_'\n",
" if c == '_':\n",
" if last_was_us:\n",
" continue\n",
" last_was_us = True\n",
" else:\n",
" last_was_us = False\n",
" out.append(c)\n",
" cleaned = ''.join(out).strip('_')\n",
" if not cleaned:\n",
" cleaned = 'col'\n",
" if cleaned[0].isdigit():\n",
" cleaned = 'c_' + cleaned\n",
" return cleaned\n",
"\n",
"\n",
"def uniquify_columns(cols: List[str]) -> List[str]:\n",
" seen = {}\n",
" result = []\n",
" for c in cols:\n",
" base = clean_column_name(c)\n",
" idx = seen.get(base, 0)\n",
" if idx == 0:\n",
" result.append(base)\n",
" else:\n",
" result.append(f'{base}_{idx}')\n",
" seen[base] = idx + 1\n",
" return result\n",
"\n",
"\n",
"def postgres_type_for_series(s: pd.Series) -> str:\n",
" dt = s.dtype\n",
" if pd.api.types.is_integer_dtype(dt):\n",
" return 'bigint'\n",
" if pd.api.types.is_float_dtype(dt):\n",
" return 'double precision'\n",
" if pd.api.types.is_bool_dtype(dt):\n",
" return 'boolean'\n",
" if pd.api.types.is_datetime64_any_dtype(dt):\n",
" return 'timestamp'\n",
" return 'text'\n",
"\n",
"\n",
"def read_tabular(path: str, sheet_name=0) -> pd.DataFrame:\n",
" p = Path(path)\n",
" suffix = p.suffix.lower()\n",
" if suffix == '.csv':\n",
" return pd.read_csv(p)\n",
" if suffix in ['.xlsx', '.xls']:\n",
" return pd.read_excel(p, sheet_name=sheet_name)\n",
" if suffix == '.jsonl':\n",
" return pd.read_json(p, lines=True)\n",
" if suffix == '.json':\n",
" return pd.read_json(p)\n",
" if suffix == '.parquet':\n",
" return pd.read_parquet(p)\n",
" raise ValueError(f'Unsupported file type: {suffix}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5",
"metadata": {},
"outputs": [],
"source": [
"def load_dataframe_to_postgis(\n",
" df: pd.DataFrame,\n",
" table_fqn: str,\n",
" if_exists: str = 'replace', # replace | append | fail\n",
" batch_size: int = 5000,\n",
" analyze: bool = True,\n",
") -> None:\n",
" if if_exists not in {'replace', 'append', 'fail'}:\n",
" raise ValueError(\"if_exists must be one of: replace, append, fail\")\n",
"\n",
" schema, table = parse_table_name(table_fqn)\n",
"\n",
" work = df.copy()\n",
" work.columns = uniquify_columns([str(c) for c in work.columns])\n",
"\n",
" # Convert pandas NaN/NaT to Python None for psycopg2.\n",
" work = work.astype(object).where(pd.notna(work), None)\n",
"\n",
" col_defs = []\n",
" for col in work.columns:\n",
" pg_type = postgres_type_for_series(df[col]) if col in df.columns else 'text'\n",
" col_defs.append((col, pg_type))\n",
"\n",
" rows = [tuple(row) for row in work.itertuples(index=False, name=None)]\n",
"\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute('create extension if not exists postgis')\n",
"\n",
" cur.execute(\n",
" 'select to_regclass(%s)',\n",
" (f'{schema}.{table}',),\n",
" )\n",
" exists = cur.fetchone()[0] is not None\n",
"\n",
" if exists and if_exists == 'fail':\n",
" raise RuntimeError(f'Table {schema}.{table} already exists')\n",
"\n",
" if exists and if_exists == 'replace':\n",
" cur.execute(\n",
" sql.SQL('drop table {}.{}').format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" )\n",
" )\n",
" exists = False\n",
"\n",
" if not exists:\n",
" ddl_cols = [\n",
" sql.SQL('{} {}').format(sql.Identifier(c), sql.SQL(t))\n",
" for c, t in col_defs\n",
" ]\n",
" create_sql = sql.SQL('create table {}.{} ({})').format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" sql.SQL(', ').join(ddl_cols),\n",
" )\n",
" cur.execute(create_sql)\n",
"\n",
" if rows:\n",
" insert_sql = sql.SQL('insert into {}.{} ({}) values %s').format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" sql.SQL(', ').join(sql.Identifier(c) for c in work.columns),\n",
" )\n",
" execute_values(cur, insert_sql.as_string(cur), rows, page_size=batch_size)\n",
"\n",
" if analyze:\n",
" cur.execute(\n",
" sql.SQL('analyze {}.{}').format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" )\n",
" )\n",
"\n",
" conn.commit()\n",
"\n",
" print(f'Loaded {len(rows)} rows into {schema}.{table}')\n",
" print('Columns:', ', '.join(work.columns))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6",
"metadata": {},
"outputs": [],
"source": [
"def add_point_geometry(\n",
" table_fqn: str,\n",
" lon_col: str = 'longitude',\n",
" lat_col: str = 'latitude',\n",
" geom_col: str = 'geom',\n",
" srid: int = 4326,\n",
") -> None:\n",
" schema, table = parse_table_name(table_fqn)\n",
"\n",
" with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" cur.execute(\n",
" sql.SQL('alter table {}.{} add column if not exists {} geometry(Point, %s)').format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" sql.Identifier(geom_col),\n",
" ),\n",
" (srid,),\n",
" )\n",
"\n",
" cur.execute(\n",
" sql.SQL(\n",
" 'update {}.{} set {} = case '\n",
" 'when {} is not null and {} is not null '\n",
" 'then ST_SetSRID(ST_MakePoint({}::double precision, {}::double precision), %s) '\n",
" 'else null end'\n",
" ).format(\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" sql.Identifier(geom_col),\n",
" sql.Identifier(lon_col),\n",
" sql.Identifier(lat_col),\n",
" sql.Identifier(lon_col),\n",
" sql.Identifier(lat_col),\n",
" ),\n",
" (srid,),\n",
" )\n",
"\n",
" idx_name = f'{table}_{geom_col}_gix'\n",
" cur.execute(\n",
" sql.SQL('create index if not exists {} on {}.{} using gist ({})').format(\n",
" sql.Identifier(idx_name),\n",
" sql.Identifier(schema),\n",
" sql.Identifier(table),\n",
" sql.Identifier(geom_col),\n",
" )\n",
" )\n",
" cur.execute(\n",
" sql.SQL('analyze {}.{}').format(sql.Identifier(schema), sql.Identifier(table))\n",
" )\n",
" conn.commit()\n",
"\n",
" print(f'Geometry built in {schema}.{table}.{geom_col} and GiST indexed')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7",
"metadata": {},
"outputs": [],
"source": [
"# Utility rate tracker loader: research-ready state-level table for data-center analyses.\n",
"FILE_PATH = 'new/MarchUtilityRateTrackerTable-Downloadable-Excel.xlsx'\n",
"SHEET_NAME = 'List of Utilities'\n",
"TARGET_TABLE = 'public.utility_rate_tracker_2025_2028'\n",
"IF_EXISTS = 'replace' # replace | append | fail\n",
"\n",
"STATE_NAME_TO_CODE = {\n",
" 'Alabama': 'AL', 'Alaska': 'AK', 'Arizona': 'AZ', 'Arkansas': 'AR', 'California': 'CA',\n",
" 'Colorado': 'CO', 'Connecticut': 'CT', 'Delaware': 'DE', 'District Of Columbia': 'DC',\n",
" 'Florida': 'FL', 'Georgia': 'GA', 'Hawaii': 'HI', 'Idaho': 'ID', 'Illinois': 'IL',\n",
" 'Indiana': 'IN', 'Iowa': 'IA', 'Kansas': 'KS', 'Kentucky': 'KY', 'Louisiana': 'LA',\n",
" 'Maine': 'ME', 'Maryland': 'MD', 'Massachusetts': 'MA', 'Michigan': 'MI',\n",
" 'Minnesota': 'MN', 'Mississippi': 'MS', 'Missouri': 'MO', 'Montana': 'MT',\n",
" 'Nebraska': 'NE', 'Nevada': 'NV', 'New Hampshire': 'NH', 'New Jersey': 'NJ',\n",
" 'New Mexico': 'NM', 'New York': 'NY', 'North Carolina': 'NC', 'North Dakota': 'ND',\n",
" 'Ohio': 'OH', 'Oklahoma': 'OK', 'Oregon': 'OR', 'Pennsylvania': 'PA',\n",
" 'Rhode Island': 'RI', 'South Carolina': 'SC', 'South Dakota': 'SD', 'Tennessee': 'TN',\n",
" 'Texas': 'TX', 'Utah': 'UT', 'Vermont': 'VT', 'Virginia': 'VA', 'Washington': 'WA',\n",
" 'West Virginia': 'WV', 'Wisconsin': 'WI', 'Wyoming': 'WY'\n",
"}\n",
"\n",
"\n",
"def parse_effective_date(value):\n",
" if pd.isna(value):\n",
" return pd.NaT\n",
" text = str(value).strip()\n",
" if not text or text.upper() == 'N/A':\n",
" return pd.NaT\n",
"\n",
" numeric = pd.to_numeric(text, errors='coerce')\n",
" if pd.notna(numeric) and numeric >= 20000 and numeric <= 70000:\n",
" return pd.to_datetime(numeric, unit='D', origin='1899-12-30', errors='coerce')\n",
"\n",
" return pd.to_datetime(text, errors='coerce')\n",
"\n",
"\n",
"def detect_utility_header_row(raw_sheet: pd.DataFrame) -> int:\n",
" required = {'utility provider', 'state', 'electric or gas bill'}\n",
" max_scan = min(len(raw_sheet), 20)\n",
" for i in range(max_scan):\n",
" row_values = {\n",
" str(v).strip().lower()\n",
" for v in raw_sheet.iloc[i].tolist()\n",
" if pd.notna(v) and str(v).strip()\n",
" }\n",
" if required.issubset(row_values):\n",
" return i\n",
" raise ValueError('Could not detect utility table header row in workbook')\n",
"\n",
"\n",
"def build_utility_rate_tracker_frame(xlsx_path: str) -> pd.DataFrame:\n",
" # Read without a fixed header so we can detect the real header row robustly.\n",
" raw_sheet = pd.read_excel(\n",
" xlsx_path,\n",
" sheet_name=SHEET_NAME,\n",
" header=None,\n",
" engine='openpyxl',\n",
" )\n",
" header_row = detect_utility_header_row(raw_sheet)\n",
"\n",
" raw = pd.read_excel(\n",
" xlsx_path,\n",
" sheet_name=SHEET_NAME,\n",
" header=header_row,\n",
" engine='openpyxl',\n",
" )\n",
"\n",
" # Some files include leading blank/unnamed columns before the real table.\n",
" raw = raw.loc[:, ~raw.columns.astype(str).str.startswith('Unnamed')]\n",
"\n",
" source_to_target = {\n",
" 'Utility Provider': 'utility_provider',\n",
" 'State': 'state_name',\n",
" 'Electric or gas bill': 'service_type',\n",
" '# of customers': 'customer_count',\n",
" 'Total revenue increase, 20252028': 'total_revenue_increase_2025_2028',\n",
" 'Time Period': 'time_period',\n",
" 'Monthly increase amount': 'monthly_increase_amount',\n",
" 'Monthly % increase': 'monthly_pct_increase',\n",
" 'Effective date': 'effective_date_raw',\n",
" 'Status': 'status',\n",
" }\n",
" missing = [c for c in source_to_target if c not in raw.columns]\n",
" if missing:\n",
" raise ValueError(f'Missing expected utility columns: {missing}')\n",
"\n",
" raw = raw[list(source_to_target.keys())].rename(columns=source_to_target)\n",
"\n",
" for col in ['utility_provider', 'state_name', 'service_type', 'time_period', 'status']:\n",
" raw[col] = raw[col].astype(str).str.strip()\n",
"\n",
" raw = raw[raw['utility_provider'].notna() & (raw['utility_provider'] != '')]\n",
" raw = raw[raw['utility_provider'].str.lower() != 'nan']\n",
"\n",
" raw['state_name'] = raw['state_name'].str.title()\n",
" raw['state_code'] = raw['state_name'].map(STATE_NAME_TO_CODE)\n",
" raw['state_id'] = raw['state_code'].map(STATE_FIPS)\n",
"\n",
" raw['customer_count'] = pd.to_numeric(raw['customer_count'], errors='coerce')\n",
" raw['total_revenue_increase_2025_2028'] = pd.to_numeric(raw['total_revenue_increase_2025_2028'], errors='coerce')\n",
" raw['monthly_increase_amount'] = pd.to_numeric(raw['monthly_increase_amount'], errors='coerce')\n",
"\n",
" pct = pd.to_numeric(raw['monthly_pct_increase'], errors='coerce')\n",
" raw['monthly_pct_increase_ratio'] = pct.where((pct <= 1) | pct.isna(), pct / 100.0)\n",
"\n",
" raw['effective_date'] = raw['effective_date_raw'].apply(parse_effective_date)\n",
" raw['source_file'] = Path(xlsx_path).name\n",
"\n",
" ordered_cols = [\n",
" 'utility_provider',\n",
" 'state_name',\n",
" 'state_code',\n",
" 'state_id',\n",
" 'service_type',\n",
" 'customer_count',\n",
" 'total_revenue_increase_2025_2028',\n",
" 'time_period',\n",
" 'monthly_increase_amount',\n",
" 'monthly_pct_increase_ratio',\n",
" 'effective_date',\n",
" 'effective_date_raw',\n",
" 'status',\n",
" 'source_file',\n",
" ]\n",
" return raw[ordered_cols]\n",
"\n",
"\n",
"df = build_utility_rate_tracker_frame(FILE_PATH)\n",
"print('Rows:', len(df), 'Cols:', len(df.columns))\n",
"print('States:', df['state_code'].nunique())\n",
"print('Service types:', sorted(df['service_type'].dropna().unique().tolist()))\n",
"print('Rows missing state_code:', int(df['state_code'].isna().sum()))\n",
"display(df.head(10))\n",
"\n",
"load_dataframe_to_postgis(df, TARGET_TABLE, if_exists=IF_EXISTS)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"# QA: show any state names that did not map to USPS abbreviations.\n",
"unmatched = df[df['state_code'].isna()][['state_name', 'utility_provider', 'service_type', 'time_period', 'status']].copy()\n",
"print('Unmatched rows:', len(unmatched))\n",
"if len(unmatched):\n",
" display(unmatched.drop_duplicates().sort_values(['state_name', 'utility_provider']).reset_index(drop=True))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9",
"metadata": {},
"outputs": [],
"source": [
"# Optional: this table has lon/lat columns, so build geometry with those names.\n",
"# add_point_geometry(TARGET_TABLE, lon_col='lon', lat_col='lat', geom_col='geom', srid=4326)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "10",
"metadata": {},
"outputs": [],
"source": [
"# Quick sanity check: show row count and latest tables in public schema.\n",
"with get_conn() as conn:\n",
" with conn.cursor() as cur:\n",
" schema, table = parse_table_name(TARGET_TABLE)\n",
" cur.execute(\n",
" sql.SQL('select count(*) from {}.{}').format(sql.Identifier(schema), sql.Identifier(table))\n",
" )\n",
" print('Target row count:', cur.fetchone()[0])\n",
"\n",
" cur.execute(\n",
" \"\"\"\n",
" select schemaname, tablename\n",
" from pg_tables\n",
" where schemaname = 'public'\n",
" order by tablename desc\n",
" limit 25\n",
" \"\"\"\n",
" )\n",
" for row in cur.fetchall():\n",
" print(f'{row[0]}.{row[1]}')"
]
},
{
"cell_type": "markdown",
"id": "11",
"metadata": {},
"source": [
"## Tables Created by This Notebook and Their Relationships\n",
"\n",
"### Tables Created / Maintained\n",
"1. `TARGET_TABLE` (configured at runtime)\n",
"- Generic loader output table built from the current dataframe schema.\n",
"- Replaced/appended according to `if_exists` behavior.\n",
"- Optional point geometry can be added in helper cells.\n",
"\n",
"### Key Relationships\n",
"- This notebook is table-agnostic: relationships depend on the selected `TARGET_TABLE` and source columns.\n",
"- When key columns (for example `master_id`, `geoid`, IDs, dates) are present, the loaded table can be joined to domain tables.\n",
"- When geometry is present, the loaded table can participate in spatial joins.\n",
"\n",
"### Rerun Notes\n",
"- Safe to rerun for recurring refreshes of different source files.\n",
"- Always confirm `TARGET_TABLE` and `if_exists` before execution to avoid unintended replacement of existing tables."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}