#!/usr/bin/env python3 """Load internet_cables/*.json into PostGIS. Reads: - internet_cables/all_cables.json -> public.internet_cables (+ landing points) - internet_cables/city_dominance_2026.json -> public.internet_city_dominance - internet_cables/year-summaries.json -> public.internet_cable_year_summaries - internet_cables/meta.json -> public.internet_cable_meta Requires env vars: PGWEB_HOST, PGWEB_PORT, PGWEB_USER, PGWEB_PASSWORD. """ import argparse import json import os import re from decimal import Decimal import psycopg2 from psycopg2.extras import Json, execute_values DATA_DIR = "internet_cables" DB_NAME = "data_centers" CABLES_TABLE = "public.internet_cables" LANDINGS_TABLE = "public.internet_cable_landing_points" CITY_TABLE = "public.internet_city_dominance" YEAR_TABLE = "public.internet_cable_year_summaries" META_TABLE = "public.internet_cable_meta" LENGTH_KM_RE = re.compile(r"([\d,\.]+)\s*km", re.IGNORECASE) def parse_length_km(raw): if not raw: return None match = LENGTH_KM_RE.search(raw) if not match: return None try: return Decimal(match.group(1).replace(",", "")) except Exception: return None def to_int(value): if value in (None, ""): return None try: return int(value) except (TypeError, ValueError): return None def to_bool(value): if value is None: return None return bool(value) def linestring_to_wkt(coords): return "(" + ", ".join(f"{lon} {lat}" for lon, lat in coords) + ")" def feature_to_multilinestring_wkt(geometry): gtype = geometry.get("type") coords = geometry.get("coordinates") or [] if gtype == "MultiLineString": parts = [linestring_to_wkt(line) for line in coords if len(line) >= 2] elif gtype == "LineString": parts = [linestring_to_wkt(coords)] if len(coords) >= 2 else [] else: return None if not parts: return None return "MULTILINESTRING(" + ", ".join(parts) + ")" def create_cable_tables(cur): cur.execute( f""" create table {CABLES_TABLE} ( feature_id text primary key, cable_id text, name text, color text, owners text, rfs_year integer, decommission_year integer, length_raw text, length_km numeric, cable_type text, url text, extra_urls jsonb, properties jsonb, geom geometry(MultiLineString, 4326) ) """ ) cur.execute( f"create index internet_cables_geom_gix on {CABLES_TABLE} using gist (geom)" ) cur.execute( f"create index internet_cables_cable_id_idx on {CABLES_TABLE} (cable_id)" ) cur.execute( f"create index internet_cables_rfs_year_idx on {CABLES_TABLE} (rfs_year)" ) cur.execute( f""" create table {LANDINGS_TABLE} ( feature_id text references {CABLES_TABLE}(feature_id) on delete cascade, ordinal integer, landing_id text, name text, country text, is_tbd boolean, primary key (feature_id, ordinal) ) """ ) cur.execute( f"create index internet_cable_landings_landing_id_idx on {LANDINGS_TABLE} (landing_id)" ) cur.execute( f"create index internet_cable_landings_country_idx on {LANDINGS_TABLE} (country)" ) def create_city_table(cur): cur.execute( f""" create table {CITY_TABLE} ( id text primary key, city text, country text, country_name text, region text, status text, physical_capacity_tbps numeric, added_physical_capacity_tbps numeric, logical_dominance_ips bigint, top_asns jsonb, longitude double precision, latitude double precision, geom geometry(Point, 4326) generated always as (ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)) stored ) """ ) cur.execute( f"create index internet_city_dominance_geom_gix on {CITY_TABLE} using gist (geom)" ) cur.execute( f"create index internet_city_dominance_country_idx on {CITY_TABLE} (country)" ) def create_year_table(cur): cur.execute( f""" create table {YEAR_TABLE} ( year integer primary key, description text ) """ ) def create_meta_table(cur): cur.execute( f""" create table {META_TABLE} ( key text primary key, value text ) """ ) def load_cables(cur, path): with open(path, encoding="utf-8") as fh: features = json.load(fh) cable_rows = [] landing_rows = [] used_feature_ids = set() for idx, feature in enumerate(features): props = feature.get("properties") or {} feature_id = props.get("feature_id") or props.get("id") if not feature_id: feature_id = f"legacy-{idx}" # Disambiguate any residual collisions base = feature_id suffix = 1 while feature_id in used_feature_ids: feature_id = f"{base}-{suffix}" suffix += 1 used_feature_ids.add(feature_id) # length may also live in a top-level lengthKm field on legacy entries length_raw = props.get("length") length_km = parse_length_km(length_raw) if length_km is None and feature.get("lengthKm") is not None: try: length_km = Decimal(str(feature["lengthKm"])) except Exception: pass wkt = feature_to_multilinestring_wkt(feature.get("geometry") or {}) cable_rows.append( ( feature_id, props.get("id"), props.get("name"), props.get("color"), props.get("owners"), to_int(props.get("rfs_year")), to_int(props.get("decommission_year")), length_raw, length_km, props.get("type"), props.get("url"), Json(props.get("extraUrls") or []), Json(props), wkt, ) ) for ordinal, lp in enumerate(props.get("landing_points") or []): landing_rows.append( ( feature_id, ordinal, lp.get("id") or None, lp.get("name"), lp.get("country"), to_bool(lp.get("is_tbd")), ) ) execute_values( cur, f""" insert into {CABLES_TABLE} ( feature_id, cable_id, name, color, owners, rfs_year, decommission_year, length_raw, length_km, cable_type, url, extra_urls, properties, geom ) values %s """, cable_rows, template=( "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, " "ST_GeomFromText(%s, 4326))" ), page_size=200, ) execute_values( cur, f""" insert into {LANDINGS_TABLE} (feature_id, ordinal, landing_id, name, country, is_tbd) values %s """, landing_rows, page_size=500, ) return len(cable_rows), len(landing_rows) def load_city_dominance(cur, path): with open(path, encoding="utf-8") as fh: items = json.load(fh) rows = [] seen = set() for item in items: item_id = item.get("id") if not item_id or item_id in seen: continue seen.add(item_id) coords = item.get("coordinates") or [None, None] lon, lat = (coords + [None, None])[:2] rows.append( ( item_id, item.get("city"), item.get("country"), item.get("country_name"), item.get("region"), item.get("status"), item.get("physical_capacity_tbps"), item.get("added_physical_capacity_tbps"), item.get("logical_dominance_ips"), Json(item.get("top_asns") or []), lon, lat, ) ) execute_values( cur, f""" insert into {CITY_TABLE} ( id, city, country, country_name, region, status, physical_capacity_tbps, added_physical_capacity_tbps, logical_dominance_ips, top_asns, longitude, latitude ) values %s """, rows, page_size=500, ) return len(rows) def load_year_summaries(cur, path): with open(path, encoding="utf-8") as fh: data = json.load(fh) rows = [] for year_key, value in data.items(): year = to_int(year_key) if year is None: continue description = value.get("description") if isinstance(value, dict) else str(value) rows.append((year, description)) execute_values( cur, f"insert into {YEAR_TABLE} (year, description) values %s", rows, page_size=200, ) return len(rows) def load_meta(cur, path): with open(path, encoding="utf-8") as fh: data = json.load(fh) rows = [(str(k), str(v)) for k, v in data.items()] execute_values( cur, f"insert into {META_TABLE} (key, value) values %s", rows, ) return len(rows) def parse_args(): parser = argparse.ArgumentParser( description="Load internet_cables/*.json into PostGIS." ) parser.add_argument( "--data-dir", default=DATA_DIR, help=f"Directory containing the JSON files (default: {DATA_DIR})", ) parser.add_argument( "--replace", action="store_true", help="Drop existing target tables before loading.", ) return parser.parse_args() def main(): args = parse_args() cables_path = os.path.join(args.data_dir, "all_cables.json") city_path = os.path.join(args.data_dir, "city_dominance_2026.json") year_path = os.path.join(args.data_dir, "year-summaries.json") meta_path = os.path.join(args.data_dir, "meta.json") for path in [cables_path, city_path, year_path, meta_path]: if not os.path.exists(path): raise FileNotFoundError(path) conn = psycopg2.connect( host=os.environ["PGWEB_HOST"], port=os.environ["PGWEB_PORT"], user=os.environ["PGWEB_USER"], password=os.environ["PGWEB_PASSWORD"], dbname=DB_NAME, ) try: with conn: with conn.cursor() as cur: cur.execute("create extension if not exists postgis") if args.replace: cur.execute( f"drop table if exists {LANDINGS_TABLE}, {CABLES_TABLE}, " f"{CITY_TABLE}, {YEAR_TABLE}, {META_TABLE} cascade" ) for table, creator in [ (CABLES_TABLE, lambda c: create_cable_tables(c)), (CITY_TABLE, create_city_table), (YEAR_TABLE, create_year_table), (META_TABLE, create_meta_table), ]: cur.execute("select to_regclass(%s)", (table,)) if cur.fetchone()[0] is not None: raise RuntimeError( f"Target table {table} already exists; rerun with --replace to overwrite." ) creator(cur) cable_count, landing_count = load_cables(cur, cables_path) city_count = load_city_dominance(cur, city_path) year_count = load_year_summaries(cur, year_path) meta_count = load_meta(cur, meta_path) for table in [CABLES_TABLE, LANDINGS_TABLE, CITY_TABLE, YEAR_TABLE, META_TABLE]: cur.execute(f"analyze {table}") finally: conn.close() print( f"loaded {cable_count} cables, {landing_count} landing points, " f"{city_count} city-dominance points, {year_count} year summaries, " f"{meta_count} meta rows." ) if __name__ == "__main__": main()