Files
data-centers/build_master_data_centers.py

259 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
Build (or refresh) public.master_data_centers by merging:
- public.us_dc_sample_geocoded (curated, attribute-rich)
- public.osm_data_centers (OpenStreetMap features)
Deduplication rule (curated row wins):
Step 1: for each curated row, find a matching OSM row by
curated.id = osm.osm_id::text OR
curated.nominatim_osm_id = osm.osm_id OR
ST_DWithin(curated.geom, osm.geom, 150 m, geography)
(closest match by sphere distance when multiple).
Step 2: insert every curated row into master, filling NULLs from the
matched OSM row when present. source = 'merged' if matched,
otherwise 'curated'.
Step 3: insert every OSM row whose osm_id was NOT matched in Step 1.
source = 'osm'.
Result: every curated row appears once; OSM-only rows appear once; no row is
emitted twice. The merge logic lives in a SQL function
public.refresh_master_data_centers() so subsequent refreshes are one call.
"""
import argparse
import os
import sys
import psycopg2
DB_NAME = "data_centers"
MASTER_TABLE = "public.master_data_centers"
CURATED_TABLE = "public.us_dc_sample_geocoded"
OSM_TABLE = "public.osm_data_centers"
MATCH_RADIUS_M = 150
CREATE_TABLE_SQL = f"""
create table if not exists {MASTER_TABLE} (
master_id text primary key,
source text not null check (source in ('curated','osm','merged')),
curated_id text,
osm_id text,
name text,
operator text,
street_address text,
city text,
state text,
postal_code text,
country text,
website text,
phone text,
power_mw numeric,
area_sqft integer,
nearest_airport_miles numeric,
has_bare_metal boolean,
has_iaas boolean,
has_internet_exchange boolean,
has_colocation boolean,
certifications text,
content_summary text,
osm_tags jsonb,
matched_osm_tag_passes text[],
match_method text,
match_distance_m numeric,
longitude double precision not null,
latitude double precision not null,
geom geometry(Point, 4326)
generated always as (ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)) stored
);
create index if not exists master_data_centers_geom_gix on {MASTER_TABLE} using gist (geom);
create index if not exists master_data_centers_source_idx on {MASTER_TABLE} (source);
create index if not exists master_data_centers_state_idx on {MASTER_TABLE} (state);
create index if not exists master_data_centers_curated_idx on {MASTER_TABLE} (curated_id);
create index if not exists master_data_centers_osm_idx on {MASTER_TABLE} (osm_id);
"""
REFRESH_FUNCTION_SQL = f"""
create or replace function public.refresh_master_data_centers(match_radius_m double precision default {MATCH_RADIUS_M})
returns table (curated_rows bigint, merged_rows bigint, osm_only_rows bigint, total_rows bigint)
language plpgsql
as $$
begin
truncate table {MASTER_TABLE};
-- pick a single best OSM match for each curated row, prioritizing ID
-- equality, then nominatim id, then closest within radius
create temporary table _curated_to_osm on commit drop as
with ranked as (
select
c.id as curated_id,
o.id as osm_id,
case
when c.id = o.osm_id::text then 'id'
when c.nominatim_osm_id = o.osm_id then 'nominatim_id'
else 'spatial'
end as method,
ST_DistanceSphere(c.geom, o.geom) as dist_m,
row_number() over (
partition by c.id
order by
case
when c.id = o.osm_id::text then 0
when c.nominatim_osm_id = o.osm_id then 1
else 2
end,
ST_DistanceSphere(c.geom, o.geom) asc
) as rn
from {CURATED_TABLE} c
join {OSM_TABLE} o
on c.id = o.osm_id::text
or c.nominatim_osm_id = o.osm_id
or ST_DWithin(c.geom::geography, o.geom::geography, match_radius_m)
)
select curated_id, osm_id, method, dist_m
from ranked
where rn = 1;
-- Step 1+2: insert curated rows (with OSM nulls filled where matched)
insert into {MASTER_TABLE} (
master_id, source, curated_id, osm_id,
name, operator, street_address, city, state, postal_code, country,
website, phone, power_mw, area_sqft, nearest_airport_miles,
has_bare_metal, has_iaas, has_internet_exchange, has_colocation,
certifications, content_summary,
osm_tags, matched_osm_tag_passes,
match_method, match_distance_m,
longitude, latitude
)
select
'curated/' || c.id,
case when m.osm_id is not null then 'merged' else 'curated' end,
c.id,
m.osm_id,
coalesce(c.facility_name, o.name),
coalesce(c.provider, o.operator),
coalesce(c.street_address, o.street_address),
coalesce(c.city, o.city),
coalesce(c.state_code, o.state),
coalesce(c.postal_code, o.postal_code),
coalesce(c.country, o.country),
coalesce(c.url, o.website),
coalesce(c.phone, o.phone),
c.power_mw,
c.area_sqft,
c.nearest_airport_miles,
c.has_bare_metal,
c.has_iaas,
c.has_internet_exchange,
c.has_colocation,
c.certifications,
c.content_summary,
o.tags,
o.matched_tags,
m.method,
round(m.dist_m::numeric, 2),
c.longitude,
c.latitude
from {CURATED_TABLE} c
left join _curated_to_osm m on m.curated_id = c.id
left join {OSM_TABLE} o on o.id = m.osm_id;
-- Step 3: insert OSM rows that no curated row claimed
insert into {MASTER_TABLE} (
master_id, source, curated_id, osm_id,
name, operator, street_address, city, state, postal_code, country,
website, phone,
osm_tags, matched_osm_tag_passes,
longitude, latitude
)
select
'osm/' || o.id,
'osm',
null,
o.id,
o.name,
o.operator,
o.street_address,
o.city,
o.state,
o.postal_code,
o.country,
o.website,
o.phone,
o.tags,
o.matched_tags,
o.longitude,
o.latitude
from {OSM_TABLE} o
where not exists (
select 1 from _curated_to_osm m where m.osm_id = o.id
);
analyze {MASTER_TABLE};
return query
select
count(*) filter (where source = 'curated'),
count(*) filter (where source = 'merged'),
count(*) filter (where source = 'osm'),
count(*)
from {MASTER_TABLE};
end;
$$;
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--radius-m",
type=float,
default=MATCH_RADIUS_M,
help=f"Spatial match radius in meters (default: {MATCH_RADIUS_M}).",
)
parser.add_argument(
"--recreate",
action="store_true",
help=f"Drop and recreate {MASTER_TABLE} before building.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
conn = psycopg2.connect(
host=os.environ["PGWEB_HOST"],
port=os.environ["PGWEB_PORT"],
user=os.environ["PGWEB_USER"],
password=os.environ["PGWEB_PASSWORD"],
dbname=DB_NAME,
)
try:
with conn:
with conn.cursor() as cur:
cur.execute("create extension if not exists postgis")
if args.recreate:
cur.execute(f"drop table if exists {MASTER_TABLE} cascade")
cur.execute(CREATE_TABLE_SQL)
cur.execute(REFRESH_FUNCTION_SQL)
cur.execute(
"select * from public.refresh_master_data_centers(%s)",
(args.radius_m,),
)
curated, merged, osm_only, total = cur.fetchone()
finally:
conn.close()
print(f"master_data_centers refreshed (radius={args.radius_m} m):")
print(f" curated-only rows: {curated}")
print(f" merged rows (curated + OSM): {merged}")
print(f" osm-only rows: {osm_only}")
print(f" total: {total}")
return 0
if __name__ == "__main__":
sys.exit(main())