259 lines
8.7 KiB
Python
259 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Build (or refresh) public.master_data_centers by merging:
|
|
- public.us_dc_sample_geocoded (curated, attribute-rich)
|
|
- public.osm_data_centers (OpenStreetMap features)
|
|
|
|
Deduplication rule (curated row wins):
|
|
Step 1: for each curated row, find a matching OSM row by
|
|
curated.id = osm.osm_id::text OR
|
|
curated.nominatim_osm_id = osm.osm_id OR
|
|
ST_DWithin(curated.geom, osm.geom, 150 m, geography)
|
|
(closest match by sphere distance when multiple).
|
|
Step 2: insert every curated row into master, filling NULLs from the
|
|
matched OSM row when present. source = 'merged' if matched,
|
|
otherwise 'curated'.
|
|
Step 3: insert every OSM row whose osm_id was NOT matched in Step 1.
|
|
source = 'osm'.
|
|
|
|
Result: every curated row appears once; OSM-only rows appear once; no row is
|
|
emitted twice. The merge logic lives in a SQL function
|
|
public.refresh_master_data_centers() so subsequent refreshes are one call.
|
|
"""
|
|
import argparse
|
|
import os
|
|
import sys
|
|
|
|
import psycopg2
|
|
|
|
DB_NAME = "data_centers"
|
|
MASTER_TABLE = "public.master_data_centers"
|
|
CURATED_TABLE = "public.us_dc_sample_geocoded"
|
|
OSM_TABLE = "public.osm_data_centers"
|
|
MATCH_RADIUS_M = 150
|
|
|
|
|
|
CREATE_TABLE_SQL = f"""
|
|
create table if not exists {MASTER_TABLE} (
|
|
master_id text primary key,
|
|
source text not null check (source in ('curated','osm','merged')),
|
|
curated_id text,
|
|
osm_id text,
|
|
name text,
|
|
operator text,
|
|
street_address text,
|
|
city text,
|
|
state text,
|
|
postal_code text,
|
|
country text,
|
|
website text,
|
|
phone text,
|
|
power_mw numeric,
|
|
area_sqft integer,
|
|
nearest_airport_miles numeric,
|
|
has_bare_metal boolean,
|
|
has_iaas boolean,
|
|
has_internet_exchange boolean,
|
|
has_colocation boolean,
|
|
certifications text,
|
|
content_summary text,
|
|
osm_tags jsonb,
|
|
matched_osm_tag_passes text[],
|
|
match_method text,
|
|
match_distance_m numeric,
|
|
longitude double precision not null,
|
|
latitude double precision not null,
|
|
geom geometry(Point, 4326)
|
|
generated always as (ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)) stored
|
|
);
|
|
create index if not exists master_data_centers_geom_gix on {MASTER_TABLE} using gist (geom);
|
|
create index if not exists master_data_centers_source_idx on {MASTER_TABLE} (source);
|
|
create index if not exists master_data_centers_state_idx on {MASTER_TABLE} (state);
|
|
create index if not exists master_data_centers_curated_idx on {MASTER_TABLE} (curated_id);
|
|
create index if not exists master_data_centers_osm_idx on {MASTER_TABLE} (osm_id);
|
|
"""
|
|
|
|
|
|
REFRESH_FUNCTION_SQL = f"""
|
|
create or replace function public.refresh_master_data_centers(match_radius_m double precision default {MATCH_RADIUS_M})
|
|
returns table (curated_rows bigint, merged_rows bigint, osm_only_rows bigint, total_rows bigint)
|
|
language plpgsql
|
|
as $$
|
|
begin
|
|
truncate table {MASTER_TABLE};
|
|
|
|
-- pick a single best OSM match for each curated row, prioritizing ID
|
|
-- equality, then nominatim id, then closest within radius
|
|
create temporary table _curated_to_osm on commit drop as
|
|
with ranked as (
|
|
select
|
|
c.id as curated_id,
|
|
o.id as osm_id,
|
|
case
|
|
when c.id = o.osm_id::text then 'id'
|
|
when c.nominatim_osm_id = o.osm_id then 'nominatim_id'
|
|
else 'spatial'
|
|
end as method,
|
|
ST_DistanceSphere(c.geom, o.geom) as dist_m,
|
|
row_number() over (
|
|
partition by c.id
|
|
order by
|
|
case
|
|
when c.id = o.osm_id::text then 0
|
|
when c.nominatim_osm_id = o.osm_id then 1
|
|
else 2
|
|
end,
|
|
ST_DistanceSphere(c.geom, o.geom) asc
|
|
) as rn
|
|
from {CURATED_TABLE} c
|
|
join {OSM_TABLE} o
|
|
on c.id = o.osm_id::text
|
|
or c.nominatim_osm_id = o.osm_id
|
|
or ST_DWithin(c.geom::geography, o.geom::geography, match_radius_m)
|
|
)
|
|
select curated_id, osm_id, method, dist_m
|
|
from ranked
|
|
where rn = 1;
|
|
|
|
-- Step 1+2: insert curated rows (with OSM nulls filled where matched)
|
|
insert into {MASTER_TABLE} (
|
|
master_id, source, curated_id, osm_id,
|
|
name, operator, street_address, city, state, postal_code, country,
|
|
website, phone, power_mw, area_sqft, nearest_airport_miles,
|
|
has_bare_metal, has_iaas, has_internet_exchange, has_colocation,
|
|
certifications, content_summary,
|
|
osm_tags, matched_osm_tag_passes,
|
|
match_method, match_distance_m,
|
|
longitude, latitude
|
|
)
|
|
select
|
|
'curated/' || c.id,
|
|
case when m.osm_id is not null then 'merged' else 'curated' end,
|
|
c.id,
|
|
m.osm_id,
|
|
coalesce(c.facility_name, o.name),
|
|
coalesce(c.provider, o.operator),
|
|
coalesce(c.street_address, o.street_address),
|
|
coalesce(c.city, o.city),
|
|
coalesce(c.state_code, o.state),
|
|
coalesce(c.postal_code, o.postal_code),
|
|
coalesce(c.country, o.country),
|
|
coalesce(c.url, o.website),
|
|
coalesce(c.phone, o.phone),
|
|
c.power_mw,
|
|
c.area_sqft,
|
|
c.nearest_airport_miles,
|
|
c.has_bare_metal,
|
|
c.has_iaas,
|
|
c.has_internet_exchange,
|
|
c.has_colocation,
|
|
c.certifications,
|
|
c.content_summary,
|
|
o.tags,
|
|
o.matched_tags,
|
|
m.method,
|
|
round(m.dist_m::numeric, 2),
|
|
c.longitude,
|
|
c.latitude
|
|
from {CURATED_TABLE} c
|
|
left join _curated_to_osm m on m.curated_id = c.id
|
|
left join {OSM_TABLE} o on o.id = m.osm_id;
|
|
|
|
-- Step 3: insert OSM rows that no curated row claimed
|
|
insert into {MASTER_TABLE} (
|
|
master_id, source, curated_id, osm_id,
|
|
name, operator, street_address, city, state, postal_code, country,
|
|
website, phone,
|
|
osm_tags, matched_osm_tag_passes,
|
|
longitude, latitude
|
|
)
|
|
select
|
|
'osm/' || o.id,
|
|
'osm',
|
|
null,
|
|
o.id,
|
|
o.name,
|
|
o.operator,
|
|
o.street_address,
|
|
o.city,
|
|
o.state,
|
|
o.postal_code,
|
|
o.country,
|
|
o.website,
|
|
o.phone,
|
|
o.tags,
|
|
o.matched_tags,
|
|
o.longitude,
|
|
o.latitude
|
|
from {OSM_TABLE} o
|
|
where not exists (
|
|
select 1 from _curated_to_osm m where m.osm_id = o.id
|
|
);
|
|
|
|
analyze {MASTER_TABLE};
|
|
|
|
return query
|
|
select
|
|
count(*) filter (where source = 'curated'),
|
|
count(*) filter (where source = 'merged'),
|
|
count(*) filter (where source = 'osm'),
|
|
count(*)
|
|
from {MASTER_TABLE};
|
|
end;
|
|
$$;
|
|
"""
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"--radius-m",
|
|
type=float,
|
|
default=MATCH_RADIUS_M,
|
|
help=f"Spatial match radius in meters (default: {MATCH_RADIUS_M}).",
|
|
)
|
|
parser.add_argument(
|
|
"--recreate",
|
|
action="store_true",
|
|
help=f"Drop and recreate {MASTER_TABLE} before building.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
conn = psycopg2.connect(
|
|
host=os.environ["PGWEB_HOST"],
|
|
port=os.environ["PGWEB_PORT"],
|
|
user=os.environ["PGWEB_USER"],
|
|
password=os.environ["PGWEB_PASSWORD"],
|
|
dbname=DB_NAME,
|
|
)
|
|
try:
|
|
with conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("create extension if not exists postgis")
|
|
if args.recreate:
|
|
cur.execute(f"drop table if exists {MASTER_TABLE} cascade")
|
|
cur.execute(CREATE_TABLE_SQL)
|
|
cur.execute(REFRESH_FUNCTION_SQL)
|
|
cur.execute(
|
|
"select * from public.refresh_master_data_centers(%s)",
|
|
(args.radius_m,),
|
|
)
|
|
curated, merged, osm_only, total = cur.fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
print(f"master_data_centers refreshed (radius={args.radius_m} m):")
|
|
print(f" curated-only rows: {curated}")
|
|
print(f" merged rows (curated + OSM): {merged}")
|
|
print(f" osm-only rows: {osm_only}")
|
|
print(f" total: {total}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|