#!/usr/bin/env python3 """ Fetch US data centers from OpenStreetMap (Overpass API) and load them into public.osm_data_centers in the data_centers database. Also (re)creates a unioned view public.data_centers_union combining OSM + curated rows from public.us_dc_sample_geocoded. Two Overpass passes are made because tagging is inconsistent: 1) telecom=data_center 2) building=data_center Results are deduplicated by (osm_type, osm_id); the matched tag-pass is recorded in match_tags so we can see which query found each feature. """ import argparse import json import os import sys import time from pathlib import Path from typing import Dict, List, Optional, Tuple import psycopg2 import requests from psycopg2.extras import Json, execute_values PROJECT_ROOT = Path(__file__).parent.parent OVERPASS_URL = "https://overpass-api.de/api/interpreter" TABLE = "public.osm_data_centers" VIEW = "public.data_centers_union" CURATED_TABLE = "public.us_dc_sample_geocoded" DB_NAME = "data_centers" # Tag passes: (key, value) TAG_PASSES = [ ("telecom", "data_center"), ("building", "data_center"), ] def overpass_query(tag_key: str, tag_value: str, timeout: int = 180) -> str: return f""" [out:json][timeout:{timeout}]; area["ISO3166-1"="US"][admin_level=2]->.us; ( node["{tag_key}"="{tag_value}"](area.us); way["{tag_key}"="{tag_value}"](area.us); relation["{tag_key}"="{tag_value}"](area.us); ); out center tags; """.strip() def fetch_pass(tag_key: str, tag_value: str, cache_path: Optional[str]) -> List[dict]: if cache_path and os.path.exists(cache_path): print(f" using cached response: {cache_path}") with open(cache_path, "r", encoding="utf-8") as fh: payload = json.load(fh) else: query = overpass_query(tag_key, tag_value) print(f" querying Overpass for {tag_key}={tag_value} ...") headers = { "User-Agent": "us-data-centers-inventory/1.0 (research; contact david@dadams.io)", "Accept": "application/json", } resp = requests.post( OVERPASS_URL, data={"data": query}, headers=headers, timeout=240, ) if resp.status_code != 200: print(f" Overpass returned {resp.status_code}: {resp.text[:500]}") resp.raise_for_status() payload = resp.json() if cache_path: with open(cache_path, "w", encoding="utf-8") as fh: json.dump(payload, fh) print(f" cached to {cache_path}") elements = payload.get("elements", []) print(f" pass returned {len(elements)} elements") return elements def element_coords(elem: dict) -> Tuple[Optional[float], Optional[float]]: if elem.get("type") == "node": return elem.get("lon"), elem.get("lat") center = elem.get("center") or {} return center.get("lon"), center.get("lat") def normalize_element(elem: dict, matched_tag: str) -> Optional[dict]: lon, lat = element_coords(elem) if lon is None or lat is None: return None osm_type = elem.get("type") osm_id = elem.get("id") if osm_type is None or osm_id is None: return None tags = elem.get("tags") or {} return { "id": f"{osm_type}/{osm_id}", "osm_type": osm_type, "osm_id": int(osm_id), "name": tags.get("name"), "operator": tags.get("operator"), "operator_type": tags.get("operator:type"), "telecom": tags.get("telecom"), "building": tags.get("building"), "power": tags.get("power"), "website": tags.get("website") or tags.get("contact:website"), "phone": tags.get("phone") or tags.get("contact:phone"), "street_address": " ".join( part for part in (tags.get("addr:housenumber"), tags.get("addr:street")) if part ) or None, "city": tags.get("addr:city"), "state": tags.get("addr:state"), "postal_code": tags.get("addr:postcode"), "country": tags.get("addr:country") or "US", "matched_tags": [matched_tag], "tags": tags, "longitude": float(lon), "latitude": float(lat), } def merge_records(existing: Dict[str, dict], new_rows: List[dict]) -> None: for row in new_rows: key = row["id"] prior = existing.get(key) if prior is None: existing[key] = row continue # merge matched_tags; keep first non-null values for other fields merged_tags = list(dict.fromkeys(prior["matched_tags"] + row["matched_tags"])) prior["matched_tags"] = merged_tags for col, val in row.items(): if col == "matched_tags": continue if prior.get(col) in (None, "") and val not in (None, ""): prior[col] = val COLUMNS = [ "id", "osm_type", "osm_id", "name", "operator", "operator_type", "telecom", "building", "power", "website", "phone", "street_address", "city", "state", "postal_code", "country", "matched_tags", "tags", "longitude", "latitude", ] def row_to_tuple(row: dict) -> tuple: return ( row["id"], row["osm_type"], row["osm_id"], row.get("name"), row.get("operator"), row.get("operator_type"), row.get("telecom"), row.get("building"), row.get("power"), row.get("website"), row.get("phone"), row.get("street_address"), row.get("city"), row.get("state"), row.get("postal_code"), row.get("country"), row.get("matched_tags", []), Json(row.get("tags", {})), row["longitude"], row["latitude"], ) def create_table(cur) -> None: cur.execute( f""" create table {TABLE} ( id text primary key, osm_type text not null, osm_id bigint not null, name text, operator text, operator_type text, telecom text, building text, power text, website text, phone text, street_address text, city text, state text, postal_code text, country text, matched_tags text[] not null default '{{}}', tags jsonb not null default '{{}}'::jsonb, longitude double precision not null, latitude double precision not null, ingested_at timestamptz not null default now(), geom geometry(Point, 4326) generated always as (ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)) stored ) """ ) cur.execute(f"create index osm_data_centers_geom_gix on {TABLE} using gist (geom)") cur.execute(f"create index osm_data_centers_state_idx on {TABLE} (state)") cur.execute(f"create index osm_data_centers_tags_gin on {TABLE} using gin (tags)") def insert_values(cur, rows: List[dict], upsert: bool) -> None: sql = f"insert into {TABLE} ({', '.join(COLUMNS)}) values %s" if upsert: update_cols = [c for c in COLUMNS if c != "id"] assignments = ", ".join(f"{c} = excluded.{c}" for c in update_cols) sql += ( f" on conflict (id) do update set {assignments}, " f"ingested_at = now()" ) execute_values(cur, sql, [row_to_tuple(r) for r in rows], page_size=200) def create_or_replace_view(cur) -> None: cur.execute( f""" create or replace view {VIEW} as select 'curated/' || id as id, 'curated'::text as source, facility_name as name, provider as operator, street_address, city, state_code as state, postal_code, country, url as website, phone, longitude, latitude, geom from {CURATED_TABLE} union all select id, 'osm'::text as source, name, operator, street_address, city, state, postal_code, country, website, phone, longitude, latitude, geom from {TABLE} """ ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--cache-dir", default=str(PROJECT_ROOT / "output"), help="Directory to cache raw Overpass responses (default: output/).", ) parser.add_argument( "--no-cache", action="store_true", help="Do not read or write Overpass cache files; always hit the API.", ) parser.add_argument( "--recreate", action="store_true", help=f"Drop and recreate {TABLE} before loading.", ) parser.add_argument( "--upsert", action="store_true", default=True, help="On id conflicts, update the existing row (default: on).", ) parser.add_argument( "--skip-view", action="store_true", help=f"Do not create/replace the unioned view {VIEW}.", ) return parser.parse_args() def main() -> int: args = parse_args() os.makedirs(args.cache_dir, exist_ok=True) merged: Dict[str, dict] = {} for tag_key, tag_value in TAG_PASSES: cache_path = ( None if args.no_cache else os.path.join(args.cache_dir, f"overpass_{tag_key}_{tag_value}.json") ) print(f"Pass: {tag_key}={tag_value}") elements = fetch_pass(tag_key, tag_value, cache_path) normalized = [ row for row in (normalize_element(e, f"{tag_key}={tag_value}") for e in elements) if row is not None ] print(f" normalized {len(normalized)} rows with coords") merge_records(merged, normalized) # be polite to Overpass between passes time.sleep(2) rows = list(merged.values()) print(f"Total deduped OSM data-center features: {len(rows)}") if not rows: print("No rows fetched; aborting DB load.", file=sys.stderr) return 1 conn = psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) try: with conn: with conn.cursor() as cur: cur.execute("create extension if not exists postgis") if args.recreate: cur.execute(f"drop table if exists {TABLE} cascade") cur.execute("select to_regclass(%s)", (TABLE,)) if cur.fetchone()[0] is None: create_table(cur) insert_values(cur, rows, upsert=args.upsert) cur.execute(f"analyze {TABLE}") if not args.skip_view: cur.execute("select to_regclass(%s)", (CURATED_TABLE,)) if cur.fetchone()[0] is not None: create_or_replace_view(cur) print(f"View {VIEW} (re)created.") else: print( f"Skipping view: {CURATED_TABLE} does not exist.", file=sys.stderr, ) cur.execute(f"select count(*) from {TABLE}") total = cur.fetchone()[0] finally: conn.close() print(f"Loaded {len(rows)} rows into {TABLE}; table now has {total} rows total.") return 0 if __name__ == "__main__": sys.exit(main())