{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# PostGIS Table Loader Notebook\n", "\n", "Use this notebook to load additional tabular files into the same PostgreSQL/PostGIS database used by the scripts in this folder.\n", "\n", "Expected environment variables (same pattern as your .py files):\n", "- `PGWEB_HOST`, `PGWEB_PORT`, `PGWEB_USER`, `PGWEB_PASSWORD`\n", "- Optional override: `PGDATABASE` (defaults to `data_centers`)" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "import os\n", "from pathlib import Path\n", "from typing import List, Tuple\n", "\n", "import pandas as pd\n", "import psycopg2\n", "from psycopg2 import sql\n", "from psycopg2.extras import execute_values\n", "\n", "print('pandas:', pd.__version__)\n", "print('psycopg2 loaded successfully')" ] }, { "cell_type": "code", "execution_count": null, "id": "2", "metadata": {}, "outputs": [], "source": [ "# Load DB env vars for notebook kernels (which often do not inherit shell exports).\n", "def load_env_file(env_path: str = '.env') -> None:\n", " p = Path(env_path)\n", " if not p.exists():\n", " print(f'No {env_path} file found in {Path.cwd()}')\n", " return\n", "\n", " loaded = 0\n", " for raw_line in p.read_text(encoding='utf-8').splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith('#') or '=' not in line:\n", " continue\n", " key, value = line.split('=', 1)\n", " key = key.strip()\n", " value = value.strip().strip('\"').strip(\"'\")\n", " if key and key not in os.environ:\n", " os.environ[key] = value\n", " loaded += 1\n", " print(f'Loaded {loaded} env var(s) from {env_path}')\n", "\n", "\n", "def require_env(keys):\n", " missing = [k for k in keys if not os.getenv(k)]\n", " if missing:\n", " raise EnvironmentError(\n", " 'Missing required env vars in notebook kernel: ' + ', '.join(missing) +\n", " '.\\nSet them in this notebook, or add them to a .env file in this folder.'\n", " )\n", "\n", "\n", "load_env_file('.env')\n", "print('PGWEB_HOST:', os.getenv('PGWEB_HOST', ''))\n", "print('PGWEB_PORT:', os.getenv('PGWEB_PORT', ''))\n", "print('PGWEB_USER:', os.getenv('PGWEB_USER', ''))" ] }, { "cell_type": "code", "execution_count": null, "id": "3", "metadata": {}, "outputs": [], "source": [ "# Connection setup: mirrors the existing scripts in this repository.\n", "required_keys = ['PGWEB_HOST', 'PGWEB_PORT', 'PGWEB_USER', 'PGWEB_PASSWORD']\n", "require_env(required_keys)\n", "\n", "DB_NAME = os.getenv('PGDATABASE', 'data_centers')\n", "\n", "def get_conn():\n", " return psycopg2.connect(\n", " host=os.environ['PGWEB_HOST'],\n", " port=os.environ['PGWEB_PORT'],\n", " user=os.environ['PGWEB_USER'],\n", " password=os.environ['PGWEB_PASSWORD'],\n", " dbname=\"data_centers\",\n", " )\n", "\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select current_database(), current_user, version()')\n", " db, usr, ver = cur.fetchone()\n", " print('Connected to DB:', db)\n", " print('As user:', usr)\n", " print('Postgres:', ver.split(',')[0])\n", " cur.execute('create extension if not exists postgis')\n", " print('PostGIS extension is available')" ] }, { "cell_type": "code", "execution_count": null, "id": "4", "metadata": {}, "outputs": [], "source": [ "def parse_table_name(table_fqn: str) -> Tuple[str, str]:\n", " table_fqn = table_fqn.strip()\n", " if '.' in table_fqn:\n", " schema, table = table_fqn.split('.', 1)\n", " else:\n", " schema, table = 'public', table_fqn\n", " return schema, table\n", "\n", "\n", "def clean_column_name(name: str) -> str:\n", " s = str(name).strip().lower()\n", " out = []\n", " last_was_us = False\n", " for ch in s:\n", " keep = ch.isalnum() or ch == '_'\n", " c = ch if keep else '_'\n", " if c == '_':\n", " if last_was_us:\n", " continue\n", " last_was_us = True\n", " else:\n", " last_was_us = False\n", " out.append(c)\n", " cleaned = ''.join(out).strip('_')\n", " if not cleaned:\n", " cleaned = 'col'\n", " if cleaned[0].isdigit():\n", " cleaned = 'c_' + cleaned\n", " return cleaned\n", "\n", "\n", "def uniquify_columns(cols: List[str]) -> List[str]:\n", " seen = {}\n", " result = []\n", " for c in cols:\n", " base = clean_column_name(c)\n", " idx = seen.get(base, 0)\n", " if idx == 0:\n", " result.append(base)\n", " else:\n", " result.append(f'{base}_{idx}')\n", " seen[base] = idx + 1\n", " return result\n", "\n", "\n", "def postgres_type_for_series(s: pd.Series) -> str:\n", " dt = s.dtype\n", " if pd.api.types.is_integer_dtype(dt):\n", " return 'bigint'\n", " if pd.api.types.is_float_dtype(dt):\n", " return 'double precision'\n", " if pd.api.types.is_bool_dtype(dt):\n", " return 'boolean'\n", " if pd.api.types.is_datetime64_any_dtype(dt):\n", " return 'timestamp'\n", " return 'text'\n", "\n", "\n", "def read_tabular(path: str, sheet_name=0) -> pd.DataFrame:\n", " p = Path(path)\n", " suffix = p.suffix.lower()\n", " if suffix == '.csv':\n", " return pd.read_csv(p)\n", " if suffix in ['.xlsx', '.xls']:\n", " return pd.read_excel(p, sheet_name=sheet_name)\n", " if suffix == '.jsonl':\n", " return pd.read_json(p, lines=True)\n", " if suffix == '.json':\n", " return pd.read_json(p)\n", " if suffix == '.parquet':\n", " return pd.read_parquet(p)\n", " raise ValueError(f'Unsupported file type: {suffix}')" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "def load_dataframe_to_postgis(\n", " df: pd.DataFrame,\n", " table_fqn: str,\n", " if_exists: str = 'replace', # replace | append | fail\n", " batch_size: int = 5000,\n", " analyze: bool = True,\n", ") -> None:\n", " if if_exists not in {'replace', 'append', 'fail'}:\n", " raise ValueError(\"if_exists must be one of: replace, append, fail\")\n", "\n", " schema, table = parse_table_name(table_fqn)\n", "\n", " work = df.copy()\n", " work.columns = uniquify_columns([str(c) for c in work.columns])\n", "\n", " # Convert pandas NaN/NaT to Python None for psycopg2.\n", " work = work.astype(object).where(pd.notna(work), None)\n", "\n", " col_defs = []\n", " for col in work.columns:\n", " pg_type = postgres_type_for_series(df[col]) if col in df.columns else 'text'\n", " col_defs.append((col, pg_type))\n", "\n", " rows = [tuple(row) for row in work.itertuples(index=False, name=None)]\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('create extension if not exists postgis')\n", "\n", " cur.execute(\n", " 'select to_regclass(%s)',\n", " (f'{schema}.{table}',),\n", " )\n", " exists = cur.fetchone()[0] is not None\n", "\n", " if exists and if_exists == 'fail':\n", " raise RuntimeError(f'Table {schema}.{table} already exists')\n", "\n", " if exists and if_exists == 'replace':\n", " cur.execute(\n", " sql.SQL('drop table {}.{}').format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " )\n", " )\n", " exists = False\n", "\n", " if not exists:\n", " ddl_cols = [\n", " sql.SQL('{} {}').format(sql.Identifier(c), sql.SQL(t))\n", " for c, t in col_defs\n", " ]\n", " create_sql = sql.SQL('create table {}.{} ({})').format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " sql.SQL(', ').join(ddl_cols),\n", " )\n", " cur.execute(create_sql)\n", "\n", " if rows:\n", " insert_sql = sql.SQL('insert into {}.{} ({}) values %s').format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " sql.SQL(', ').join(sql.Identifier(c) for c in work.columns),\n", " )\n", " execute_values(cur, insert_sql.as_string(cur), rows, page_size=batch_size)\n", "\n", " if analyze:\n", " cur.execute(\n", " sql.SQL('analyze {}.{}').format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " )\n", " )\n", "\n", " conn.commit()\n", "\n", " print(f'Loaded {len(rows)} rows into {schema}.{table}')\n", " print('Columns:', ', '.join(work.columns))" ] }, { "cell_type": "code", "execution_count": null, "id": "6", "metadata": {}, "outputs": [], "source": [ "def add_point_geometry(\n", " table_fqn: str,\n", " lon_col: str = 'longitude',\n", " lat_col: str = 'latitude',\n", " geom_col: str = 'geom',\n", " srid: int = 4326,\n", ") -> None:\n", " schema, table = parse_table_name(table_fqn)\n", "\n", " with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " cur.execute(\n", " sql.SQL('alter table {}.{} add column if not exists {} geometry(Point, %s)').format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " sql.Identifier(geom_col),\n", " ),\n", " (srid,),\n", " )\n", "\n", " cur.execute(\n", " sql.SQL(\n", " 'update {}.{} set {} = case '\n", " 'when {} is not null and {} is not null '\n", " 'then ST_SetSRID(ST_MakePoint({}::double precision, {}::double precision), %s) '\n", " 'else null end'\n", " ).format(\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " sql.Identifier(geom_col),\n", " sql.Identifier(lon_col),\n", " sql.Identifier(lat_col),\n", " sql.Identifier(lon_col),\n", " sql.Identifier(lat_col),\n", " ),\n", " (srid,),\n", " )\n", "\n", " idx_name = f'{table}_{geom_col}_gix'\n", " cur.execute(\n", " sql.SQL('create index if not exists {} on {}.{} using gist ({})').format(\n", " sql.Identifier(idx_name),\n", " sql.Identifier(schema),\n", " sql.Identifier(table),\n", " sql.Identifier(geom_col),\n", " )\n", " )\n", " cur.execute(\n", " sql.SQL('analyze {}.{}').format(sql.Identifier(schema), sql.Identifier(table))\n", " )\n", " conn.commit()\n", "\n", " print(f'Geometry built in {schema}.{table}.{geom_col} and GiST indexed')" ] }, { "cell_type": "code", "execution_count": null, "id": "7", "metadata": {}, "outputs": [], "source": [ "# Utility rate tracker loader: research-ready state-level table for data-center analyses.\n", "FILE_PATH = 'new/MarchUtilityRateTrackerTable-Downloadable-Excel.xlsx'\n", "SHEET_NAME = 'List of Utilities'\n", "TARGET_TABLE = 'public.utility_rate_tracker_2025_2028'\n", "IF_EXISTS = 'replace' # replace | append | fail\n", "\n", "STATE_NAME_TO_CODE = {\n", " 'Alabama': 'AL', 'Alaska': 'AK', 'Arizona': 'AZ', 'Arkansas': 'AR', 'California': 'CA',\n", " 'Colorado': 'CO', 'Connecticut': 'CT', 'Delaware': 'DE', 'District Of Columbia': 'DC',\n", " 'Florida': 'FL', 'Georgia': 'GA', 'Hawaii': 'HI', 'Idaho': 'ID', 'Illinois': 'IL',\n", " 'Indiana': 'IN', 'Iowa': 'IA', 'Kansas': 'KS', 'Kentucky': 'KY', 'Louisiana': 'LA',\n", " 'Maine': 'ME', 'Maryland': 'MD', 'Massachusetts': 'MA', 'Michigan': 'MI',\n", " 'Minnesota': 'MN', 'Mississippi': 'MS', 'Missouri': 'MO', 'Montana': 'MT',\n", " 'Nebraska': 'NE', 'Nevada': 'NV', 'New Hampshire': 'NH', 'New Jersey': 'NJ',\n", " 'New Mexico': 'NM', 'New York': 'NY', 'North Carolina': 'NC', 'North Dakota': 'ND',\n", " 'Ohio': 'OH', 'Oklahoma': 'OK', 'Oregon': 'OR', 'Pennsylvania': 'PA',\n", " 'Rhode Island': 'RI', 'South Carolina': 'SC', 'South Dakota': 'SD', 'Tennessee': 'TN',\n", " 'Texas': 'TX', 'Utah': 'UT', 'Vermont': 'VT', 'Virginia': 'VA', 'Washington': 'WA',\n", " 'West Virginia': 'WV', 'Wisconsin': 'WI', 'Wyoming': 'WY'\n", "}\n", "\n", "\n", "def parse_effective_date(value):\n", " if pd.isna(value):\n", " return pd.NaT\n", " text = str(value).strip()\n", " if not text or text.upper() == 'N/A':\n", " return pd.NaT\n", "\n", " numeric = pd.to_numeric(text, errors='coerce')\n", " if pd.notna(numeric) and numeric >= 20000 and numeric <= 70000:\n", " return pd.to_datetime(numeric, unit='D', origin='1899-12-30', errors='coerce')\n", "\n", " return pd.to_datetime(text, errors='coerce')\n", "\n", "\n", "def detect_utility_header_row(raw_sheet: pd.DataFrame) -> int:\n", " required = {'utility provider', 'state', 'electric or gas bill'}\n", " max_scan = min(len(raw_sheet), 20)\n", " for i in range(max_scan):\n", " row_values = {\n", " str(v).strip().lower()\n", " for v in raw_sheet.iloc[i].tolist()\n", " if pd.notna(v) and str(v).strip()\n", " }\n", " if required.issubset(row_values):\n", " return i\n", " raise ValueError('Could not detect utility table header row in workbook')\n", "\n", "\n", "def build_utility_rate_tracker_frame(xlsx_path: str) -> pd.DataFrame:\n", " # Read without a fixed header so we can detect the real header row robustly.\n", " raw_sheet = pd.read_excel(\n", " xlsx_path,\n", " sheet_name=SHEET_NAME,\n", " header=None,\n", " engine='openpyxl',\n", " )\n", " header_row = detect_utility_header_row(raw_sheet)\n", "\n", " raw = pd.read_excel(\n", " xlsx_path,\n", " sheet_name=SHEET_NAME,\n", " header=header_row,\n", " engine='openpyxl',\n", " )\n", "\n", " # Some files include leading blank/unnamed columns before the real table.\n", " raw = raw.loc[:, ~raw.columns.astype(str).str.startswith('Unnamed')]\n", "\n", " source_to_target = {\n", " 'Utility Provider': 'utility_provider',\n", " 'State': 'state_name',\n", " 'Electric or gas bill': 'service_type',\n", " '# of customers': 'customer_count',\n", " 'Total revenue increase, 2025–2028': 'total_revenue_increase_2025_2028',\n", " 'Time Period': 'time_period',\n", " 'Monthly increase amount': 'monthly_increase_amount',\n", " 'Monthly % increase': 'monthly_pct_increase',\n", " 'Effective date': 'effective_date_raw',\n", " 'Status': 'status',\n", " }\n", " missing = [c for c in source_to_target if c not in raw.columns]\n", " if missing:\n", " raise ValueError(f'Missing expected utility columns: {missing}')\n", "\n", " raw = raw[list(source_to_target.keys())].rename(columns=source_to_target)\n", "\n", " for col in ['utility_provider', 'state_name', 'service_type', 'time_period', 'status']:\n", " raw[col] = raw[col].astype(str).str.strip()\n", "\n", " raw = raw[raw['utility_provider'].notna() & (raw['utility_provider'] != '')]\n", " raw = raw[raw['utility_provider'].str.lower() != 'nan']\n", "\n", " raw['state_name'] = raw['state_name'].str.title()\n", " raw['state_code'] = raw['state_name'].map(STATE_NAME_TO_CODE)\n", " raw['state_id'] = raw['state_code'].map(STATE_FIPS)\n", "\n", " raw['customer_count'] = pd.to_numeric(raw['customer_count'], errors='coerce')\n", " raw['total_revenue_increase_2025_2028'] = pd.to_numeric(raw['total_revenue_increase_2025_2028'], errors='coerce')\n", " raw['monthly_increase_amount'] = pd.to_numeric(raw['monthly_increase_amount'], errors='coerce')\n", "\n", " pct = pd.to_numeric(raw['monthly_pct_increase'], errors='coerce')\n", " raw['monthly_pct_increase_ratio'] = pct.where((pct <= 1) | pct.isna(), pct / 100.0)\n", "\n", " raw['effective_date'] = raw['effective_date_raw'].apply(parse_effective_date)\n", " raw['source_file'] = Path(xlsx_path).name\n", "\n", " ordered_cols = [\n", " 'utility_provider',\n", " 'state_name',\n", " 'state_code',\n", " 'state_id',\n", " 'service_type',\n", " 'customer_count',\n", " 'total_revenue_increase_2025_2028',\n", " 'time_period',\n", " 'monthly_increase_amount',\n", " 'monthly_pct_increase_ratio',\n", " 'effective_date',\n", " 'effective_date_raw',\n", " 'status',\n", " 'source_file',\n", " ]\n", " return raw[ordered_cols]\n", "\n", "\n", "df = build_utility_rate_tracker_frame(FILE_PATH)\n", "print('Rows:', len(df), 'Cols:', len(df.columns))\n", "print('States:', df['state_code'].nunique())\n", "print('Service types:', sorted(df['service_type'].dropna().unique().tolist()))\n", "print('Rows missing state_code:', int(df['state_code'].isna().sum()))\n", "display(df.head(10))\n", "\n", "load_dataframe_to_postgis(df, TARGET_TABLE, if_exists=IF_EXISTS)" ] }, { "cell_type": "code", "execution_count": null, "id": "8", "metadata": {}, "outputs": [], "source": [ "# QA: show any state names that did not map to USPS abbreviations.\n", "unmatched = df[df['state_code'].isna()][['state_name', 'utility_provider', 'service_type', 'time_period', 'status']].copy()\n", "print('Unmatched rows:', len(unmatched))\n", "if len(unmatched):\n", " display(unmatched.drop_duplicates().sort_values(['state_name', 'utility_provider']).reset_index(drop=True))" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "# Optional: this table has lon/lat columns, so build geometry with those names.\n", "# add_point_geometry(TARGET_TABLE, lon_col='lon', lat_col='lat', geom_col='geom', srid=4326)" ] }, { "cell_type": "code", "execution_count": null, "id": "10", "metadata": {}, "outputs": [], "source": [ "# Quick sanity check: show row count and latest tables in public schema.\n", "with get_conn() as conn:\n", " with conn.cursor() as cur:\n", " schema, table = parse_table_name(TARGET_TABLE)\n", " cur.execute(\n", " sql.SQL('select count(*) from {}.{}').format(sql.Identifier(schema), sql.Identifier(table))\n", " )\n", " print('Target row count:', cur.fetchone()[0])\n", "\n", " cur.execute(\n", " \"\"\"\n", " select schemaname, tablename\n", " from pg_tables\n", " where schemaname = 'public'\n", " order by tablename desc\n", " limit 25\n", " \"\"\"\n", " )\n", " for row in cur.fetchall():\n", " print(f'{row[0]}.{row[1]}')" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.5" } }, "nbformat": 4, "nbformat_minor": 5 }