Files
data-centers/scripts/load_postgis_data_centers.py
dadams 6db5e0fff8 Fix path references in scripts after reorganization
Update 8 scripts to use Path(__file__).parent.parent as PROJECT_ROOT
so they resolve data/, output/, and internet_cables/ relative to the
project root rather than the caller's working directory.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 21:57:47 -07:00

296 lines
8.5 KiB
Python

#!/usr/bin/env python3
import argparse
import csv
import os
from decimal import Decimal
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_values
PROJECT_ROOT = Path(__file__).parent.parent
CSV_PATH = str(PROJECT_ROOT / "data" / "US_DC_Sample_geocoded.csv")
IM3_CSV_PATH = str(PROJECT_ROOT / "new" / "IM3_Existing_DataCenters.csv")
TABLE = "public.us_dc_sample_geocoded"
DB_NAME = "data_centers"
ALL_COLS = [
"id",
"provider",
"facility_name",
"url",
"provider_url",
"country",
"state",
"state_code",
"city",
"postal_code",
"street_address",
"address",
"source_address",
"phone",
"area_sqft",
"power_mw",
"nearest_airport_miles",
"has_bare_metal",
"has_iaas",
"has_internet_exchange",
"has_colocation",
"certifications",
"content_summary",
"path",
"longitude",
"latitude",
"geocode_source",
"geocode_precision",
"geocode_status",
"geocode_match_address",
"census_status",
"census_match_type",
"census_input_address",
"census_tiger_line_id",
"census_side",
"nominatim_display_name",
"nominatim_osm_type",
"nominatim_osm_id",
]
INT_COLS = {"area_sqft", "census_tiger_line_id", "nominatim_osm_id"}
NUM_COLS = {"power_mw", "nearest_airport_miles", "longitude", "latitude"}
BOOL_COLS = {
"has_bare_metal",
"has_iaas",
"has_internet_exchange",
"has_colocation",
}
def to_int(value):
if value in (None, ""):
return None
return int(Decimal(value))
def to_decimal(value):
return Decimal(value) if value not in (None, "") else None
def to_bool(value):
return bool(int(value)) if value not in (None, "") else None
def convert(row, column):
value = row.get(column)
if column in INT_COLS:
return to_int(value)
if column in NUM_COLS:
return to_decimal(value)
if column in BOOL_COLS:
return to_bool(value)
return None if value == "" else value
def normalize_geocoded_row(row):
return {column: row.get(column, "") for column in ALL_COLS}
def normalize_im3_row(row):
return {
"id": row.get("id", ""),
"provider": row.get("operator", ""),
"facility_name": row.get("name", ""),
"url": "",
"provider_url": "",
"country": "United States",
"state": row.get("state", ""),
"state_code": row.get("state_abb", ""),
"city": "",
"postal_code": "",
"street_address": "",
"address": "",
"source_address": "",
"phone": "",
"area_sqft": row.get("sqft", ""),
"power_mw": "",
"nearest_airport_miles": "",
"has_bare_metal": "",
"has_iaas": "",
"has_internet_exchange": "",
"has_colocation": "",
"certifications": "",
"content_summary": "",
"path": "IM3_Existing_DataCenters.csv",
"longitude": row.get("lon", ""),
"latitude": row.get("lat", ""),
"geocode_source": "IM3_Existing_DataCenters",
"geocode_precision": row.get("type", "") or "im3",
"geocode_status": "im3_imported",
"geocode_match_address": "",
"census_status": "",
"census_match_type": "",
"census_input_address": "",
"census_tiger_line_id": "",
"census_side": "",
"nominatim_display_name": "",
"nominatim_osm_type": "",
"nominatim_osm_id": "",
}
def read_and_normalize_rows(csv_path, source):
with open(csv_path, newline="", encoding="utf-8") as csv_file:
rows = list(csv.DictReader(csv_file))
if source == "im3":
normalized = [normalize_im3_row(row) for row in rows]
else:
normalized = [normalize_geocoded_row(row) for row in rows]
deduped = {}
for row in normalized:
row_id = (row.get("id") or "").strip()
if not row_id:
continue
deduped[row_id] = row
values = [tuple(convert(row, column) for column in ALL_COLS) for row in deduped.values()]
return rows, values
def create_table(cur):
cur.execute(
f"""
create table {TABLE} (
id text primary key,
provider text,
facility_name text,
url text,
provider_url text,
country text,
state text,
state_code text,
city text,
postal_code text,
street_address text,
address text,
source_address text,
phone text,
area_sqft integer,
power_mw numeric,
nearest_airport_miles numeric,
has_bare_metal boolean,
has_iaas boolean,
has_internet_exchange boolean,
has_colocation boolean,
certifications text,
content_summary text,
path text,
longitude double precision not null,
latitude double precision not null,
geocode_source text,
geocode_precision text,
geocode_status text,
geocode_match_address text,
census_status text,
census_match_type text,
census_input_address text,
census_tiger_line_id bigint,
census_side text,
nominatim_display_name text,
nominatim_osm_type text,
nominatim_osm_id bigint,
geom geometry(Point, 4326) generated always as
(ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)) stored
)
"""
)
def insert_values(cur, values, upsert):
insert_sql = f"insert into {TABLE} ({', '.join(ALL_COLS)}) values %s"
if upsert:
update_cols = [col for col in ALL_COLS if col != "id"]
assignments = ", ".join(f"{col} = excluded.{col}" for col in update_cols)
insert_sql += f" on conflict (id) do update set {assignments}"
execute_values(cur, insert_sql, values, page_size=100)
def parse_args():
parser = argparse.ArgumentParser(
description="Load data-center CSV data into public.us_dc_sample_geocoded."
)
parser.add_argument(
"--source",
choices=["geocoded", "im3"],
default="geocoded",
help="Input schema type. Use 'im3' for new/IM3_Existing_DataCenters.csv.",
)
parser.add_argument(
"--csv-path",
help="Override input CSV path. If omitted, uses a source-specific default.",
)
parser.add_argument(
"--append",
action="store_true",
help="Append/upsert into an existing target table instead of creating a new one.",
)
parser.add_argument(
"--upsert",
action="store_true",
help="On id conflicts, update the existing row. Recommended with --append.",
)
return parser.parse_args()
def main():
args = parse_args()
default_csv = IM3_CSV_PATH if args.source == "im3" else CSV_PATH
csv_path = args.csv_path or default_csv
rows, values = read_and_normalize_rows(csv_path, args.source)
conn = psycopg2.connect(
host=os.environ["PGWEB_HOST"],
port=os.environ["PGWEB_PORT"],
user=os.environ["PGWEB_USER"],
password=os.environ["PGWEB_PASSWORD"],
dbname=DB_NAME,
)
try:
with conn:
with conn.cursor() as cur:
cur.execute("create extension if not exists postgis")
cur.execute("select to_regclass(%s)", (TABLE,))
table_exists = cur.fetchone()[0] is not None
if not table_exists:
create_table(cur)
cur.execute(
f"create index us_dc_sample_geocoded_geom_gix on {TABLE} using gist (geom)"
)
cur.execute(
f"create index us_dc_sample_geocoded_state_city_idx on {TABLE} (state_code, city)"
)
elif not args.append:
raise RuntimeError(
f"Target table {TABLE} already exists; use --append to add data."
)
insert_values(cur, values, upsert=args.upsert)
cur.execute(f"analyze {TABLE}")
finally:
conn.close()
source_label = "IM3-adapted" if args.source == "im3" else "geocoded"
mode = "append" if args.append else "create"
conflict_mode = "upsert" if args.upsert else "insert"
print(
f"loaded {len(values)} {source_label} rows into {TABLE} "
f"(mode={mode}, conflict={conflict_mode}, csv={csv_path})"
)
if __name__ == "__main__":
main()