del env py
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,398 +0,0 @@
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from zipfile import ZIP_DEFLATED, ZipFile
|
||||
|
||||
import numpy as np
|
||||
|
||||
from pyogrio import (
|
||||
__gdal_version_string__,
|
||||
__version__,
|
||||
list_drivers,
|
||||
)
|
||||
from pyogrio._compat import (
|
||||
HAS_ARROW_API,
|
||||
HAS_ARROW_WRITE_API,
|
||||
HAS_GDAL_GEOS,
|
||||
HAS_PYARROW,
|
||||
HAS_PYPROJ,
|
||||
HAS_SHAPELY,
|
||||
)
|
||||
from pyogrio.core import vsi_rmtree
|
||||
from pyogrio.raw import read, write
|
||||
|
||||
import pytest
|
||||
|
||||
_data_dir = Path(__file__).parent.resolve() / "fixtures"
|
||||
|
||||
# mapping of driver extension to driver name for well-supported drivers
|
||||
DRIVERS = {
|
||||
".fgb": "FlatGeobuf",
|
||||
".geojson": "GeoJSON",
|
||||
".geojsonl": "GeoJSONSeq",
|
||||
".geojsons": "GeoJSONSeq",
|
||||
".gpkg": "GPKG",
|
||||
".shp": "ESRI Shapefile",
|
||||
}
|
||||
|
||||
# mapping of driver name to extension
|
||||
DRIVER_EXT = {driver: ext for ext, driver in DRIVERS.items()}
|
||||
|
||||
ALL_EXTS = [".fgb", ".geojson", ".geojsonl", ".gpkg", ".shp"]
|
||||
|
||||
START_FID = {
|
||||
".fgb": 0,
|
||||
".geojson": 0,
|
||||
".geojsonl": 0,
|
||||
".geojsons": 0,
|
||||
".gpkg": 1,
|
||||
".shp": 0,
|
||||
}
|
||||
|
||||
|
||||
def pytest_report_header(config):
|
||||
drivers = ", ".join(
|
||||
f"{driver}({capability})"
|
||||
for driver, capability in sorted(list_drivers().items())
|
||||
)
|
||||
return (
|
||||
f"pyogrio {__version__}\n"
|
||||
f"GDAL {__gdal_version_string__}\n"
|
||||
f"Supported drivers: {drivers}"
|
||||
)
|
||||
|
||||
|
||||
# marks to skip tests if optional dependecies are not present
|
||||
requires_arrow_api = pytest.mark.skipif(not HAS_ARROW_API, reason="GDAL>=3.6 required")
|
||||
requires_pyarrow_api = pytest.mark.skipif(
|
||||
not HAS_ARROW_API or not HAS_PYARROW, reason="GDAL>=3.6 and pyarrow required"
|
||||
)
|
||||
|
||||
requires_pyproj = pytest.mark.skipif(not HAS_PYPROJ, reason="pyproj required")
|
||||
|
||||
requires_arrow_write_api = pytest.mark.skipif(
|
||||
not HAS_ARROW_WRITE_API or not HAS_PYARROW,
|
||||
reason="GDAL>=3.8 required for Arrow write API",
|
||||
)
|
||||
|
||||
requires_gdal_geos = pytest.mark.skipif(
|
||||
not HAS_GDAL_GEOS, reason="GDAL compiled with GEOS required"
|
||||
)
|
||||
|
||||
requires_shapely = pytest.mark.skipif(not HAS_SHAPELY, reason="Shapely >= 2.0 required")
|
||||
|
||||
|
||||
def prepare_testfile(testfile_path, dst_dir, ext):
|
||||
if ext == testfile_path.suffix:
|
||||
return testfile_path
|
||||
|
||||
dst_path = dst_dir / f"{testfile_path.stem}{ext}"
|
||||
if dst_path.exists():
|
||||
return dst_path
|
||||
|
||||
meta, _, geometry, field_data = read(testfile_path)
|
||||
|
||||
if ext == ".fgb":
|
||||
# For .fgb, spatial_index=False to avoid the rows being reordered
|
||||
meta["spatial_index"] = False
|
||||
# allow mixed Polygons/MultiPolygons type
|
||||
meta["geometry_type"] = "Unknown"
|
||||
|
||||
elif ext == ".gpkg":
|
||||
# For .gpkg, spatial_index=False to avoid the rows being reordered
|
||||
meta["spatial_index"] = False
|
||||
meta["geometry_type"] = "MultiPolygon"
|
||||
|
||||
write(dst_path, geometry, field_data, **meta)
|
||||
return dst_path
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def data_dir():
|
||||
return _data_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def naturalearth_lowres(tmp_path, request):
|
||||
ext = getattr(request, "param", ".shp")
|
||||
testfile_path = _data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp")
|
||||
|
||||
return prepare_testfile(testfile_path, tmp_path, ext)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", params=ALL_EXTS)
|
||||
def naturalearth_lowres_all_ext(tmp_path, naturalearth_lowres, request):
|
||||
return prepare_testfile(naturalearth_lowres, tmp_path, request.param)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def naturalearth_lowres_vsi(tmp_path, naturalearth_lowres):
|
||||
"""Wrap naturalearth_lowres as a zip file for VSI tests"""
|
||||
|
||||
path = tmp_path / f"{naturalearth_lowres.name}.zip"
|
||||
with ZipFile(path, mode="w", compression=ZIP_DEFLATED, compresslevel=5) as out:
|
||||
for ext in ["dbf", "prj", "shp", "shx", "cpg"]:
|
||||
filename = f"{naturalearth_lowres.stem}.{ext}"
|
||||
out.write(naturalearth_lowres.parent / filename, filename)
|
||||
|
||||
return path, f"/vsizip/{path}/{naturalearth_lowres.name}"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def naturalearth_lowres_vsimem(naturalearth_lowres):
|
||||
"""Write naturalearth_lowres to a vsimem file for VSI tests"""
|
||||
|
||||
meta, _, geometry, field_data = read(naturalearth_lowres)
|
||||
name = f"pyogrio_fixture_{naturalearth_lowres.stem}"
|
||||
dst_path = Path(f"/vsimem/{name}/{name}.gpkg")
|
||||
meta["spatial_index"] = False
|
||||
meta["geometry_type"] = "MultiPolygon"
|
||||
|
||||
write(dst_path, geometry, field_data, layer="naturalearth_lowres", **meta)
|
||||
yield dst_path
|
||||
|
||||
vsi_rmtree(dst_path.parent)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def line_zm_file():
|
||||
return _data_dir / "line_zm.gpkg"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def curve_file():
|
||||
return _data_dir / "curve.gpkg"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def curve_polygon_file():
|
||||
return _data_dir / "curvepolygon.gpkg"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def multisurface_file():
|
||||
return _data_dir / "multisurface.gpkg"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def test_gpkg_nulls():
|
||||
return _data_dir / "test_gpkg_nulls.gpkg"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def no_geometry_file(tmp_path):
|
||||
# create a GPKG layer that does not include geometry
|
||||
filename = tmp_path / "test_no_geometry.gpkg"
|
||||
write(
|
||||
filename,
|
||||
layer="no_geometry",
|
||||
geometry=None,
|
||||
field_data=[np.array(["a", "b", "c"])],
|
||||
fields=["col"],
|
||||
)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def list_field_values_file(tmp_path):
|
||||
# Create a GeoJSON file with list values in a property
|
||||
list_geojson = """{
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "int64": 1, "list_int64": [0, 1] },
|
||||
"geometry": { "type": "Point", "coordinates": [0, 2] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "int64": 2, "list_int64": [2, 3] },
|
||||
"geometry": { "type": "Point", "coordinates": [1, 2] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "int64": 3, "list_int64": [4, 5] },
|
||||
"geometry": { "type": "Point", "coordinates": [2, 2] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "int64": 4, "list_int64": [6, 7] },
|
||||
"geometry": { "type": "Point", "coordinates": [3, 2] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "int64": 5, "list_int64": [8, 9] },
|
||||
"geometry": { "type": "Point", "coordinates": [4, 2] }
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
filename = tmp_path / "test_ogr_types_list.geojson"
|
||||
with open(filename, "w") as f:
|
||||
_ = f.write(list_geojson)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def nested_geojson_file(tmp_path):
|
||||
# create GeoJSON file with nested properties
|
||||
nested_geojson = """{
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
{
|
||||
"type": "Feature",
|
||||
"geometry": {
|
||||
"type": "Point",
|
||||
"coordinates": [0, 0]
|
||||
},
|
||||
"properties": {
|
||||
"top_level": "A",
|
||||
"intermediate_level": {
|
||||
"bottom_level": "B"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
filename = tmp_path / "test_nested.geojson"
|
||||
with open(filename, "w") as f:
|
||||
_ = f.write(nested_geojson)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def datetime_file(tmp_path):
|
||||
# create GeoJSON file with millisecond precision
|
||||
datetime_geojson = """{
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "col": "2020-01-01T09:00:00.123" },
|
||||
"geometry": { "type": "Point", "coordinates": [1, 1] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "col": "2020-01-01T10:00:00" },
|
||||
"geometry": { "type": "Point", "coordinates": [2, 2] }
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
filename = tmp_path / "test_datetime.geojson"
|
||||
with open(filename, "w") as f:
|
||||
_ = f.write(datetime_geojson)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def datetime_tz_file(tmp_path):
|
||||
# create GeoJSON file with datetimes with timezone
|
||||
datetime_tz_geojson = """{
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "datetime_col": "2020-01-01T09:00:00.123-05:00" },
|
||||
"geometry": { "type": "Point", "coordinates": [1, 1] }
|
||||
},
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { "datetime_col": "2020-01-01T10:00:00-05:00" },
|
||||
"geometry": { "type": "Point", "coordinates": [2, 2] }
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
filename = tmp_path / "test_datetime_tz.geojson"
|
||||
with open(filename, "w") as f:
|
||||
f.write(datetime_tz_geojson)
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def geojson_bytes(tmp_path):
|
||||
"""Extracts first 3 records from naturalearth_lowres and writes to GeoJSON,
|
||||
returning bytes"""
|
||||
meta, _, geometry, field_data = read(
|
||||
_data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp"), max_features=3
|
||||
)
|
||||
|
||||
filename = tmp_path / "test.geojson"
|
||||
write(filename, geometry, field_data, **meta)
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
bytes_buffer = f.read()
|
||||
|
||||
return bytes_buffer
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def geojson_filelike(tmp_path):
|
||||
"""Extracts first 3 records from naturalearth_lowres and writes to GeoJSON,
|
||||
returning open file handle"""
|
||||
meta, _, geometry, field_data = read(
|
||||
_data_dir / Path("naturalearth_lowres/naturalearth_lowres.shp"), max_features=3
|
||||
)
|
||||
|
||||
filename = tmp_path / "test.geojson"
|
||||
write(filename, geometry, field_data, layer="test", **meta)
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
yield f
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def nonseekable_bytes(tmp_path):
|
||||
# mock a non-seekable byte stream, such as a zstandard handle
|
||||
class NonSeekableBytesIO(BytesIO):
|
||||
def seekable(self):
|
||||
return False
|
||||
|
||||
def seek(self, *args, **kwargs):
|
||||
raise OSError("cannot seek")
|
||||
|
||||
# wrap GeoJSON into a non-seekable BytesIO
|
||||
geojson = """{
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
{
|
||||
"type": "Feature",
|
||||
"properties": { },
|
||||
"geometry": { "type": "Point", "coordinates": [1, 1] }
|
||||
}
|
||||
]
|
||||
}"""
|
||||
|
||||
return NonSeekableBytesIO(geojson.encode("UTF-8"))
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
params=[
|
||||
# Japanese
|
||||
("CP932", "ホ"),
|
||||
# Chinese
|
||||
("CP936", "中文"),
|
||||
# Central European
|
||||
("CP1250", "Đ"),
|
||||
# Latin 1 / Western European
|
||||
("CP1252", "ÿ"),
|
||||
# Greek
|
||||
("CP1253", "Φ"),
|
||||
# Arabic
|
||||
("CP1256", "ش"),
|
||||
],
|
||||
)
|
||||
def encoded_text(request):
|
||||
"""Return tuple with encoding name and very short sample text in that encoding
|
||||
NOTE: it was determined through testing that code pages for MS-DOS do not
|
||||
consistently work across all Python installations (in particular, fail with conda),
|
||||
but ANSI code pages appear to work properly.
|
||||
"""
|
||||
return request.param
|
||||
@@ -1,108 +0,0 @@
|
||||
# Test datasets
|
||||
|
||||
## Obtaining / creating test datasets
|
||||
|
||||
If a test dataset can be created in code, do that instead. If it is used in a
|
||||
single test, create the test dataset as part of that test. If it is used in
|
||||
more than a single test, add it to `pyogrio/tests/conftest.py` instead, as a
|
||||
function-scoped test fixture.
|
||||
|
||||
If you need to obtain 3rd party test files:
|
||||
|
||||
- add a section below that describes the source location and processing steps
|
||||
to derive that dataset
|
||||
- make sure the license is compatible with including in Pyogrio (public domain or open-source)
|
||||
and record that license below
|
||||
|
||||
Please keep the test files no larger than necessary to use in tests.
|
||||
|
||||
## Included test datasets
|
||||
|
||||
### Natural Earth lowres
|
||||
|
||||
`naturalearth_lowres.shp` was copied from GeoPandas.
|
||||
|
||||
License: public domain
|
||||
|
||||
### GPKG test dataset with null values
|
||||
|
||||
`test_gpkg_nulls.gpkg` was created using Fiona backend to GeoPandas:
|
||||
|
||||
```
|
||||
from collections import OrderedDict
|
||||
|
||||
import fiona
|
||||
import geopandas as gp
|
||||
import numpy as np
|
||||
from pyogrio import write_dataframe
|
||||
|
||||
filename = "test_gpkg_nulls.gpkg"
|
||||
|
||||
df = gp.GeoDataFrame(
|
||||
{
|
||||
"col_bool": np.array([True, False, True], dtype="bool"),
|
||||
"col_int8": np.array([1, 2, 3], dtype="int8"),
|
||||
"col_int16": np.array([1, 2, 3], dtype="int16"),
|
||||
"col_int32": np.array([1, 2, 3], dtype="int32"),
|
||||
"col_int64": np.array([1, 2, 3], dtype="int64"),
|
||||
"col_uint8": np.array([1, 2, 3], dtype="uint8"),
|
||||
"col_uint16": np.array([1, 2, 3], dtype="uint16"),
|
||||
"col_uint32": np.array([1, 2, 3], dtype="uint32"),
|
||||
"col_uint64": np.array([1, 2, 3], dtype="uint64"),
|
||||
"col_float32": np.array([1.5, 2.5, 3.5], dtype="float32"),
|
||||
"col_float64": np.array([1.5, 2.5, 3.5], dtype="float64"),
|
||||
},
|
||||
geometry=gp.points_from_xy([0, 1, 2], [0, 1, 2]),
|
||||
crs="EPSG:4326",
|
||||
)
|
||||
|
||||
write_dataframe(df, filename)
|
||||
|
||||
# construct row with null values
|
||||
# Note: np.nan can only be used for float values
|
||||
null_row = {
|
||||
"type": "Fetaure",
|
||||
"id": 4,
|
||||
"properties": OrderedDict(
|
||||
[
|
||||
("col_bool", None),
|
||||
("col_int8", None),
|
||||
("col_int16", None),
|
||||
("col_int32", None),
|
||||
("col_int64", None),
|
||||
("col_uint8", None),
|
||||
("col_uint16", None),
|
||||
("col_uint32", None),
|
||||
("col_uint64", None),
|
||||
("col_float32", np.nan),
|
||||
("col_float64", np.nan),
|
||||
]
|
||||
),
|
||||
"geometry": {"type": "Point", "coordinates": (4.0, 4.0)},
|
||||
}
|
||||
|
||||
# append row with nulls to GPKG
|
||||
with fiona.open(filename, "a") as c:
|
||||
c.write(null_row)
|
||||
```
|
||||
|
||||
NOTE: Reading boolean values into GeoPandas using Fiona backend treats those
|
||||
values as `None` and column dtype as `object`; Pyogrio treats those values as
|
||||
`np.nan` and column dtype as `float64`.
|
||||
|
||||
License: same as Pyogrio
|
||||
|
||||
### OSM PBF test
|
||||
|
||||
This was downloaded from https://github.com/openstreetmap/OSM-binary/blob/master/resources/sample.pbf
|
||||
|
||||
License: [Open Data Commons Open Database License (ODbL)](https://opendatacommons.org/licenses/odbl/)
|
||||
|
||||
### Test files for geometry types that are downgraded on read
|
||||
|
||||
`line_zm.gpkg` was created using QGIS to digitize a LineString GPKG layer with Z and M enabled. Downgraded to LineString Z on read.
|
||||
`curve.gpkg` was created using QGIS to digitize a Curve GPKG layer. Downgraded to LineString on read.
|
||||
`curvepolygon.gpkg` was created using QGIS to digitize a CurvePolygon GPKG layer. Downgraded to Polygon on read.
|
||||
`multisurface.gpkg` was created using QGIS to digitize a MultiSurface GPKG layer. Downgraded to MultiPolygon on read.
|
||||
|
||||
License: same as Pyogrio
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1 +0,0 @@
|
||||
ISO-8859-1
|
||||
Binary file not shown.
@@ -1 +0,0 @@
|
||||
GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137,298.257223563]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]]
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@@ -1,678 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from numpy import allclose, array_equal
|
||||
|
||||
from pyogrio import (
|
||||
__gdal_geos_version__,
|
||||
__gdal_version__,
|
||||
detect_write_driver,
|
||||
get_gdal_config_option,
|
||||
get_gdal_data_path,
|
||||
list_drivers,
|
||||
list_layers,
|
||||
read_bounds,
|
||||
read_info,
|
||||
set_gdal_config_options,
|
||||
vsi_listtree,
|
||||
vsi_rmtree,
|
||||
vsi_unlink,
|
||||
)
|
||||
from pyogrio._compat import GDAL_GE_38
|
||||
from pyogrio._env import GDALEnv
|
||||
from pyogrio.errors import DataLayerError, DataSourceError
|
||||
from pyogrio.raw import read, write
|
||||
from pyogrio.tests.conftest import START_FID, prepare_testfile, requires_shapely
|
||||
|
||||
import pytest
|
||||
|
||||
with GDALEnv():
|
||||
# NOTE: this must be AFTER above imports, which init the GDAL and PROJ data
|
||||
# search paths
|
||||
from pyogrio._ogr import has_gdal_data, has_proj_data, ogr_driver_supports_write
|
||||
|
||||
|
||||
try:
|
||||
import shapely
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def test_gdal_data():
|
||||
# test will fail if GDAL data files cannot be found, indicating an
|
||||
# installation error
|
||||
assert has_gdal_data()
|
||||
|
||||
|
||||
def test_proj_data():
|
||||
# test will fail if PROJ data files cannot be found, indicating an
|
||||
# installation error
|
||||
assert has_proj_data()
|
||||
|
||||
|
||||
def test_get_gdal_data_path():
|
||||
# test will fail if the function returns None, which means that GDAL
|
||||
# cannot find data files, indicating an installation error
|
||||
assert isinstance(get_gdal_data_path(), str)
|
||||
|
||||
|
||||
def test_gdal_geos_version():
|
||||
assert __gdal_geos_version__ is None or isinstance(__gdal_geos_version__, tuple)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path,expected",
|
||||
[
|
||||
("test.shp", "ESRI Shapefile"),
|
||||
("test.shp.zip", "ESRI Shapefile"),
|
||||
("test.geojson", "GeoJSON"),
|
||||
("test.geojsonl", "GeoJSONSeq"),
|
||||
("test.gpkg", "GPKG"),
|
||||
pytest.param(
|
||||
"test.gpkg.zip",
|
||||
"GPKG",
|
||||
marks=pytest.mark.skipif(
|
||||
__gdal_version__ < (3, 7, 0),
|
||||
reason="writing *.gpkg.zip requires GDAL >= 3.7.0",
|
||||
),
|
||||
),
|
||||
# postgres can be detected by prefix instead of extension
|
||||
pytest.param(
|
||||
"PG:dbname=test",
|
||||
"PostgreSQL",
|
||||
marks=pytest.mark.skipif(
|
||||
"PostgreSQL" not in list_drivers(),
|
||||
reason="PostgreSQL path test requires PostgreSQL driver",
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_detect_write_driver(path, expected):
|
||||
assert detect_write_driver(path) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path",
|
||||
[
|
||||
"test.svg", # only supports read
|
||||
"test.", # not a valid extension
|
||||
"test", # no extension or prefix
|
||||
"test.foo", # not a valid extension
|
||||
"FOO:test", # not a valid prefix
|
||||
],
|
||||
)
|
||||
def test_detect_write_driver_unsupported(path):
|
||||
with pytest.raises(ValueError, match="Could not infer driver from path"):
|
||||
detect_write_driver(path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path", ["test.xml", "test.txt"])
|
||||
def test_detect_write_driver_multiple_unsupported(path):
|
||||
with pytest.raises(ValueError, match="multiple drivers are available"):
|
||||
detect_write_driver(path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"driver,expected",
|
||||
[
|
||||
# drivers known to be well-supported by pyogrio
|
||||
("ESRI Shapefile", True),
|
||||
("GeoJSON", True),
|
||||
("GeoJSONSeq", True),
|
||||
("GPKG", True),
|
||||
# drivers not supported for write by GDAL
|
||||
("HTTP", False),
|
||||
("OAPIF", False),
|
||||
],
|
||||
)
|
||||
def test_ogr_driver_supports_write(driver, expected):
|
||||
assert ogr_driver_supports_write(driver) == expected
|
||||
|
||||
|
||||
def test_list_drivers():
|
||||
all_drivers = list_drivers()
|
||||
|
||||
# verify that the core drivers are present
|
||||
for name in ("ESRI Shapefile", "GeoJSON", "GeoJSONSeq", "GPKG", "OpenFileGDB"):
|
||||
assert name in all_drivers
|
||||
|
||||
expected_capability = "rw"
|
||||
if name == "OpenFileGDB" and __gdal_version__ < (3, 6, 0):
|
||||
expected_capability = "r"
|
||||
|
||||
assert all_drivers[name] == expected_capability
|
||||
|
||||
drivers = list_drivers(read=True)
|
||||
expected = {k: v for k, v in all_drivers.items() if v.startswith("r")}
|
||||
assert len(drivers) == len(expected)
|
||||
|
||||
drivers = list_drivers(write=True)
|
||||
expected = {k: v for k, v in all_drivers.items() if v.endswith("w")}
|
||||
assert len(drivers) == len(expected)
|
||||
|
||||
drivers = list_drivers(read=True, write=True)
|
||||
expected = {
|
||||
k: v for k, v in all_drivers.items() if v.startswith("r") and v.endswith("w")
|
||||
}
|
||||
assert len(drivers) == len(expected)
|
||||
|
||||
|
||||
def test_list_layers(
|
||||
naturalearth_lowres,
|
||||
naturalearth_lowres_vsi,
|
||||
naturalearth_lowres_vsimem,
|
||||
line_zm_file,
|
||||
curve_file,
|
||||
curve_polygon_file,
|
||||
multisurface_file,
|
||||
no_geometry_file,
|
||||
):
|
||||
assert array_equal(
|
||||
list_layers(naturalearth_lowres), [["naturalearth_lowres", "Polygon"]]
|
||||
)
|
||||
|
||||
assert array_equal(
|
||||
list_layers(naturalearth_lowres_vsi[1]), [["naturalearth_lowres", "Polygon"]]
|
||||
)
|
||||
|
||||
assert array_equal(
|
||||
list_layers(naturalearth_lowres_vsimem),
|
||||
[["naturalearth_lowres", "MultiPolygon"]],
|
||||
)
|
||||
|
||||
# Measured 3D is downgraded to plain 3D during read
|
||||
# Make sure this warning is raised
|
||||
with pytest.warns(
|
||||
UserWarning, match=r"Measured \(M\) geometry types are not supported"
|
||||
):
|
||||
assert array_equal(list_layers(line_zm_file), [["line_zm", "LineString Z"]])
|
||||
|
||||
# Curve / surface types are downgraded to plain types
|
||||
assert array_equal(list_layers(curve_file), [["curve", "LineString"]])
|
||||
assert array_equal(list_layers(curve_polygon_file), [["curvepolygon", "Polygon"]])
|
||||
assert array_equal(
|
||||
list_layers(multisurface_file), [["multisurface", "MultiPolygon"]]
|
||||
)
|
||||
|
||||
# Make sure that nonspatial layer has None for geometry
|
||||
assert array_equal(list_layers(no_geometry_file), [["no_geometry", None]])
|
||||
|
||||
|
||||
def test_list_layers_bytes(geojson_bytes):
|
||||
layers = list_layers(geojson_bytes)
|
||||
|
||||
assert layers.shape == (1, 2)
|
||||
assert layers[0, 0] == "test"
|
||||
|
||||
|
||||
def test_list_layers_nonseekable_bytes(nonseekable_bytes):
|
||||
layers = list_layers(nonseekable_bytes)
|
||||
|
||||
assert layers.shape == (1, 2)
|
||||
assert layers[0, 1] == "Point"
|
||||
|
||||
|
||||
def test_list_layers_filelike(geojson_filelike):
|
||||
layers = list_layers(geojson_filelike)
|
||||
|
||||
assert layers.shape == (1, 2)
|
||||
assert layers[0, 0] == "test"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"testfile",
|
||||
["naturalearth_lowres", "naturalearth_lowres_vsimem", "naturalearth_lowres_vsi"],
|
||||
)
|
||||
def test_read_bounds(testfile, request):
|
||||
path = request.getfixturevalue(testfile)
|
||||
path = path if not isinstance(path, tuple) else path[1]
|
||||
|
||||
fids, bounds = read_bounds(path)
|
||||
assert fids.shape == (177,)
|
||||
assert bounds.shape == (4, 177)
|
||||
assert fids[0] == START_FID[Path(path).suffix]
|
||||
# Fiji; wraps antimeridian
|
||||
assert allclose(bounds[:, 0], [-180.0, -18.28799, 180.0, -16.02088])
|
||||
|
||||
|
||||
def test_read_bounds_bytes(geojson_bytes):
|
||||
fids, bounds = read_bounds(geojson_bytes)
|
||||
assert fids.shape == (3,)
|
||||
assert bounds.shape == (4, 3)
|
||||
assert allclose(bounds[:, 0], [-180.0, -18.28799, 180.0, -16.02088])
|
||||
|
||||
|
||||
def test_read_bounds_nonseekable_bytes(nonseekable_bytes):
|
||||
fids, bounds = read_bounds(nonseekable_bytes)
|
||||
assert fids.shape == (1,)
|
||||
assert bounds.shape == (4, 1)
|
||||
assert allclose(bounds[:, 0], [1, 1, 1, 1])
|
||||
|
||||
|
||||
def test_read_bounds_filelike(geojson_filelike):
|
||||
fids, bounds = read_bounds(geojson_filelike)
|
||||
assert fids.shape == (3,)
|
||||
assert bounds.shape == (4, 3)
|
||||
assert allclose(bounds[:, 0], [-180.0, -18.28799, 180.0, -16.02088])
|
||||
|
||||
|
||||
def test_read_bounds_max_features(naturalearth_lowres):
|
||||
bounds = read_bounds(naturalearth_lowres, max_features=2)[1]
|
||||
assert bounds.shape == (4, 2)
|
||||
|
||||
|
||||
def test_read_bounds_unspecified_layer_warning(data_dir):
|
||||
"""Reading a multi-layer file without specifying a layer gives a warning."""
|
||||
with pytest.warns(UserWarning, match="More than one layer found "):
|
||||
read_bounds(data_dir / "sample.osm.pbf")
|
||||
|
||||
|
||||
def test_read_bounds_negative_max_features(naturalearth_lowres):
|
||||
with pytest.raises(ValueError, match="'max_features' must be >= 0"):
|
||||
read_bounds(naturalearth_lowres, max_features=-1)
|
||||
|
||||
|
||||
def test_read_bounds_skip_features(naturalearth_lowres):
|
||||
expected_bounds = read_bounds(naturalearth_lowres, max_features=11)[1][:, 10]
|
||||
fids, bounds = read_bounds(naturalearth_lowres, skip_features=10)
|
||||
assert bounds.shape == (4, 167)
|
||||
assert allclose(bounds[:, 0], expected_bounds)
|
||||
assert fids[0] == 10
|
||||
|
||||
|
||||
def test_read_bounds_negative_skip_features(naturalearth_lowres):
|
||||
with pytest.raises(ValueError, match="'skip_features' must be >= 0"):
|
||||
read_bounds(naturalearth_lowres, skip_features=-1)
|
||||
|
||||
|
||||
def test_read_bounds_where_invalid(naturalearth_lowres_all_ext):
|
||||
with pytest.raises(ValueError, match="Invalid SQL"):
|
||||
read_bounds(naturalearth_lowres_all_ext, where="invalid")
|
||||
|
||||
|
||||
def test_read_bounds_where(naturalearth_lowres):
|
||||
fids, bounds = read_bounds(naturalearth_lowres, where="iso_a3 = 'CAN'")
|
||||
assert fids.shape == (1,)
|
||||
assert bounds.shape == (4, 1)
|
||||
assert fids[0] == 3
|
||||
assert allclose(bounds[:, 0], [-140.99778, 41.675105, -52.648099, 83.23324])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("bbox", [(1,), (1, 2), (1, 2, 3)])
|
||||
def test_read_bounds_bbox_invalid(naturalearth_lowres, bbox):
|
||||
with pytest.raises(ValueError, match="Invalid bbox"):
|
||||
read_bounds(naturalearth_lowres, bbox=bbox)
|
||||
|
||||
|
||||
def test_read_bounds_bbox(naturalearth_lowres_all_ext):
|
||||
# should return no features
|
||||
fids, bounds = read_bounds(
|
||||
naturalearth_lowres_all_ext, bbox=(0, 0, 0.00001, 0.00001)
|
||||
)
|
||||
|
||||
assert fids.shape == (0,)
|
||||
assert bounds.shape == (4, 0)
|
||||
|
||||
fids, bounds = read_bounds(naturalearth_lowres_all_ext, bbox=(-85, 8, -80, 10))
|
||||
|
||||
assert fids.shape == (2,)
|
||||
fids_expected = np.array([33, 34]) # PAN, CRI
|
||||
fids_expected += START_FID[naturalearth_lowres_all_ext.suffix]
|
||||
assert array_equal(fids, fids_expected)
|
||||
|
||||
assert bounds.shape == (4, 2)
|
||||
assert allclose(
|
||||
bounds.T,
|
||||
[
|
||||
[-82.96578305, 7.22054149, -77.24256649, 9.61161001],
|
||||
[-85.94172543, 8.22502798, -82.54619626, 11.21711925],
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@requires_shapely
|
||||
@pytest.mark.parametrize(
|
||||
"mask",
|
||||
[
|
||||
{"type": "Point", "coordinates": [0, 0]},
|
||||
'{"type": "Point", "coordinates": [0, 0]}',
|
||||
"invalid",
|
||||
],
|
||||
)
|
||||
def test_read_bounds_mask_invalid(naturalearth_lowres, mask):
|
||||
with pytest.raises(ValueError, match="'mask' parameter must be a Shapely geometry"):
|
||||
read_bounds(naturalearth_lowres, mask=mask)
|
||||
|
||||
|
||||
@requires_shapely
|
||||
def test_read_bounds_bbox_mask_invalid(naturalearth_lowres):
|
||||
with pytest.raises(ValueError, match="cannot set both 'bbox' and 'mask'"):
|
||||
read_bounds(
|
||||
naturalearth_lowres, bbox=(-85, 8, -80, 10), mask=shapely.Point(-105, 55)
|
||||
)
|
||||
|
||||
|
||||
@requires_shapely
|
||||
@pytest.mark.parametrize(
|
||||
"mask,expected",
|
||||
[
|
||||
("POINT (-105 55)", [3]),
|
||||
("POLYGON ((-80 8, -80 10, -85 10, -85 8, -80 8))", [33, 34]),
|
||||
(
|
||||
"""POLYGON ((
|
||||
6.101929 50.97085,
|
||||
5.773002 50.906611,
|
||||
5.593156 50.642649,
|
||||
6.059271 50.686052,
|
||||
6.374064 50.851481,
|
||||
6.101929 50.97085
|
||||
))""",
|
||||
[121, 129, 130],
|
||||
),
|
||||
(
|
||||
"""GEOMETRYCOLLECTION (
|
||||
POINT (-7.7 53),
|
||||
POLYGON ((-80 8, -80 10, -85 10, -85 8, -80 8))
|
||||
)""",
|
||||
[33, 34, 133],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_read_bounds_mask(naturalearth_lowres_all_ext, mask, expected):
|
||||
mask = shapely.from_wkt(mask)
|
||||
|
||||
fids = read_bounds(naturalearth_lowres_all_ext, mask=mask)[0]
|
||||
|
||||
fids_expected = np.array(expected) + START_FID[naturalearth_lowres_all_ext.suffix]
|
||||
assert array_equal(fids, fids_expected)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
__gdal_version__ < (3, 4, 0),
|
||||
reason="Cannot determine if GEOS is present or absent for GDAL < 3.4",
|
||||
)
|
||||
def test_read_bounds_bbox_intersects_vs_envelope_overlaps(naturalearth_lowres_all_ext):
|
||||
# If GEOS is present and used by GDAL, bbox filter will be based on intersection
|
||||
# of bbox and actual geometries; if GEOS is absent or not used by GDAL, it
|
||||
# will be based on overlap of bounding boxes instead
|
||||
fids, _ = read_bounds(naturalearth_lowres_all_ext, bbox=(-140, 20, -100, 45))
|
||||
|
||||
if __gdal_geos_version__ is None:
|
||||
# bboxes for CAN, RUS overlap but do not intersect geometries
|
||||
assert fids.shape == (4,)
|
||||
fids_expected = np.array([3, 4, 18, 27]) # CAN, USA, RUS, MEX
|
||||
fids_expected += START_FID[naturalearth_lowres_all_ext.suffix]
|
||||
assert array_equal(fids, fids_expected)
|
||||
|
||||
else:
|
||||
assert fids.shape == (2,)
|
||||
fids_expected = np.array([4, 27]) # USA, MEX
|
||||
fids_expected += START_FID[naturalearth_lowres_all_ext.suffix]
|
||||
assert array_equal(fids, fids_expected)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("naturalearth_lowres", [".shp", ".gpkg"], indirect=True)
|
||||
def test_read_info(naturalearth_lowres):
|
||||
meta = read_info(naturalearth_lowres)
|
||||
|
||||
assert meta["layer_name"] == "naturalearth_lowres"
|
||||
assert meta["crs"] == "EPSG:4326"
|
||||
assert meta["encoding"] == "UTF-8"
|
||||
assert meta["fields"].shape == (5,)
|
||||
assert meta["dtypes"].tolist() == ["int64", "object", "object", "object", "float64"]
|
||||
assert meta["features"] == 177
|
||||
assert allclose(meta["total_bounds"], (-180, -90, 180, 83.64513))
|
||||
assert meta["capabilities"]["random_read"] is True
|
||||
assert meta["capabilities"]["fast_spatial_filter"] is False
|
||||
assert meta["capabilities"]["fast_feature_count"] is True
|
||||
assert meta["capabilities"]["fast_total_bounds"] is True
|
||||
|
||||
if naturalearth_lowres.suffix == ".gpkg":
|
||||
assert meta["fid_column"] == "fid"
|
||||
assert meta["geometry_name"] == "geom"
|
||||
assert meta["geometry_type"] == "MultiPolygon"
|
||||
assert meta["driver"] == "GPKG"
|
||||
if GDAL_GE_38:
|
||||
# this capability is only True for GPKG if GDAL >= 3.8
|
||||
assert meta["capabilities"]["fast_set_next_by_index"] is True
|
||||
elif naturalearth_lowres.suffix == ".shp":
|
||||
# fid_column == "" for formats where fid is not physically stored
|
||||
assert meta["fid_column"] == ""
|
||||
# geometry_name == "" for formats where geometry column name cannot be
|
||||
# customized
|
||||
assert meta["geometry_name"] == ""
|
||||
assert meta["geometry_type"] == "Polygon"
|
||||
assert meta["driver"] == "ESRI Shapefile"
|
||||
assert meta["capabilities"]["fast_set_next_by_index"] is True
|
||||
else:
|
||||
raise ValueError(f"test not implemented for ext {naturalearth_lowres.suffix}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"testfile", ["naturalearth_lowres_vsimem", "naturalearth_lowres_vsi"]
|
||||
)
|
||||
def test_read_info_vsi(testfile, request):
|
||||
path = request.getfixturevalue(testfile)
|
||||
path = path if not isinstance(path, tuple) else path[1]
|
||||
|
||||
meta = read_info(path)
|
||||
|
||||
assert meta["fields"].shape == (5,)
|
||||
assert meta["features"] == 177
|
||||
|
||||
|
||||
def test_read_info_bytes(geojson_bytes):
|
||||
meta = read_info(geojson_bytes)
|
||||
|
||||
assert meta["fields"].shape == (5,)
|
||||
assert meta["features"] == 3
|
||||
|
||||
|
||||
def test_read_info_nonseekable_bytes(nonseekable_bytes):
|
||||
meta = read_info(nonseekable_bytes)
|
||||
|
||||
assert meta["fields"].shape == (0,)
|
||||
assert meta["features"] == 1
|
||||
|
||||
|
||||
def test_read_info_filelike(geojson_filelike):
|
||||
meta = read_info(geojson_filelike)
|
||||
|
||||
assert meta["fields"].shape == (5,)
|
||||
assert meta["features"] == 3
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"dataset_kwargs,fields",
|
||||
[
|
||||
({}, ["top_level", "intermediate_level"]),
|
||||
(
|
||||
{"FLATTEN_NESTED_ATTRIBUTES": "YES"},
|
||||
[
|
||||
"top_level",
|
||||
"intermediate_level_bottom_level",
|
||||
],
|
||||
),
|
||||
(
|
||||
{"flatten_nested_attributes": "yes"},
|
||||
[
|
||||
"top_level",
|
||||
"intermediate_level_bottom_level",
|
||||
],
|
||||
),
|
||||
(
|
||||
{"flatten_nested_attributes": True},
|
||||
[
|
||||
"top_level",
|
||||
"intermediate_level_bottom_level",
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_read_info_dataset_kwargs(nested_geojson_file, dataset_kwargs, fields):
|
||||
meta = read_info(nested_geojson_file, **dataset_kwargs)
|
||||
assert meta["fields"].tolist() == fields
|
||||
|
||||
|
||||
def test_read_info_invalid_dataset_kwargs(naturalearth_lowres):
|
||||
with pytest.warns(RuntimeWarning, match="does not support open option INVALID"):
|
||||
read_info(naturalearth_lowres, INVALID="YES")
|
||||
|
||||
|
||||
def test_read_info_force_feature_count_exception(data_dir):
|
||||
with pytest.raises(DataLayerError, match="Could not iterate over features"):
|
||||
read_info(data_dir / "sample.osm.pbf", layer="lines", force_feature_count=True)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"layer, force, expected",
|
||||
[
|
||||
("points", False, -1),
|
||||
("points", True, 8),
|
||||
("lines", False, -1),
|
||||
("lines", True, 36),
|
||||
],
|
||||
)
|
||||
def test_read_info_force_feature_count(data_dir, layer, force, expected):
|
||||
# the sample OSM file has non-increasing node IDs which causes the default
|
||||
# custom indexing to raise an exception iterating over features
|
||||
meta = read_info(
|
||||
data_dir / "sample.osm.pbf",
|
||||
layer=layer,
|
||||
force_feature_count=force,
|
||||
USE_CUSTOM_INDEXING=False,
|
||||
)
|
||||
assert meta["features"] == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"force_total_bounds, expected_total_bounds",
|
||||
[(True, (-180.0, -90.0, 180.0, 83.64513)), (False, None)],
|
||||
)
|
||||
def test_read_info_force_total_bounds(
|
||||
tmp_path, naturalearth_lowres, force_total_bounds, expected_total_bounds
|
||||
):
|
||||
geojson_path = prepare_testfile(
|
||||
naturalearth_lowres, dst_dir=tmp_path, ext=".geojsonl"
|
||||
)
|
||||
|
||||
info = read_info(geojson_path, force_total_bounds=force_total_bounds)
|
||||
if expected_total_bounds is not None:
|
||||
assert allclose(info["total_bounds"], expected_total_bounds)
|
||||
else:
|
||||
assert info["total_bounds"] is None
|
||||
|
||||
|
||||
def test_read_info_unspecified_layer_warning(data_dir):
|
||||
"""Reading a multi-layer file without specifying a layer gives a warning."""
|
||||
with pytest.warns(UserWarning, match="More than one layer found "):
|
||||
read_info(data_dir / "sample.osm.pbf")
|
||||
|
||||
|
||||
def test_read_info_without_geometry(no_geometry_file):
|
||||
assert read_info(no_geometry_file)["total_bounds"] is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name,value,expected",
|
||||
[
|
||||
("CPL_DEBUG", "ON", True),
|
||||
("CPL_DEBUG", True, True),
|
||||
("CPL_DEBUG", "OFF", False),
|
||||
("CPL_DEBUG", False, False),
|
||||
],
|
||||
)
|
||||
def test_set_config_options(name, value, expected):
|
||||
set_gdal_config_options({name: value})
|
||||
actual = get_gdal_config_option(name)
|
||||
assert actual == expected
|
||||
|
||||
|
||||
def test_reset_config_options():
|
||||
set_gdal_config_options({"foo": "bar"})
|
||||
assert get_gdal_config_option("foo") == "bar"
|
||||
|
||||
set_gdal_config_options({"foo": None})
|
||||
assert get_gdal_config_option("foo") is None
|
||||
|
||||
|
||||
def test_error_handling(capfd):
|
||||
# an operation that triggers a GDAL Failure
|
||||
# -> error translated into Python exception + not printed to stderr
|
||||
with pytest.raises(DataSourceError, match="No such file or directory"):
|
||||
read_info("non-existent.shp")
|
||||
|
||||
assert capfd.readouterr().err == ""
|
||||
|
||||
|
||||
def test_error_handling_warning(capfd, naturalearth_lowres):
|
||||
# an operation that triggers a GDAL Warning
|
||||
# -> translated into a Python warning + not printed to stderr
|
||||
with pytest.warns(RuntimeWarning, match="does not support open option INVALID"):
|
||||
read_info(naturalearth_lowres, INVALID="YES")
|
||||
|
||||
assert capfd.readouterr().err == ""
|
||||
|
||||
|
||||
def test_vsimem_listtree_rmtree_unlink(naturalearth_lowres):
|
||||
"""Test all basic functionalities of file handling in /vsimem/."""
|
||||
# Prepare test data in /vsimem
|
||||
meta, _, geometry, field_data = read(naturalearth_lowres)
|
||||
meta["spatial_index"] = False
|
||||
meta["geometry_type"] = "MultiPolygon"
|
||||
test_file_path = Path("/vsimem/pyogrio_test_naturalearth_lowres.gpkg")
|
||||
test_dir_path = Path(f"/vsimem/pyogrio_dir_test/{naturalearth_lowres.stem}.gpkg")
|
||||
|
||||
write(test_file_path, geometry, field_data, **meta)
|
||||
write(test_dir_path, geometry, field_data, **meta)
|
||||
|
||||
# Check if everything was created properly with listtree
|
||||
files = vsi_listtree("/vsimem/")
|
||||
assert test_file_path.as_posix() in files
|
||||
assert test_dir_path.as_posix() in files
|
||||
|
||||
# Check listtree with pattern
|
||||
files = vsi_listtree("/vsimem/", pattern="pyogrio_dir_test*.gpkg")
|
||||
assert test_file_path.as_posix() not in files
|
||||
assert test_dir_path.as_posix() in files
|
||||
|
||||
files = vsi_listtree("/vsimem/", pattern="pyogrio_test*.gpkg")
|
||||
assert test_file_path.as_posix() in files
|
||||
assert test_dir_path.as_posix() not in files
|
||||
|
||||
# Remove test_dir and its contents
|
||||
vsi_rmtree(test_dir_path.parent)
|
||||
files = vsi_listtree("/vsimem/")
|
||||
assert test_file_path.as_posix() in files
|
||||
assert test_dir_path.as_posix() not in files
|
||||
|
||||
# Remove test_file
|
||||
vsi_unlink(test_file_path)
|
||||
|
||||
|
||||
def test_vsimem_rmtree_error(naturalearth_lowres_vsimem):
|
||||
with pytest.raises(NotADirectoryError, match="Path is not a directory"):
|
||||
vsi_rmtree(naturalearth_lowres_vsimem)
|
||||
|
||||
with pytest.raises(FileNotFoundError, match="Path does not exist"):
|
||||
vsi_rmtree("/vsimem/non-existent")
|
||||
|
||||
with pytest.raises(
|
||||
OSError, match="path to in-memory file or directory is required"
|
||||
):
|
||||
vsi_rmtree("/vsimem")
|
||||
with pytest.raises(
|
||||
OSError, match="path to in-memory file or directory is required"
|
||||
):
|
||||
vsi_rmtree("/vsimem/")
|
||||
|
||||
# Verify that naturalearth_lowres_vsimem still exists.
|
||||
assert naturalearth_lowres_vsimem.as_posix() in vsi_listtree("/vsimem")
|
||||
|
||||
|
||||
def test_vsimem_unlink_error(naturalearth_lowres_vsimem):
|
||||
with pytest.raises(IsADirectoryError, match="Path is a directory"):
|
||||
vsi_unlink(naturalearth_lowres_vsimem.parent)
|
||||
|
||||
with pytest.raises(FileNotFoundError, match="Path does not exist"):
|
||||
vsi_unlink("/vsimem/non-existent.gpkg")
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,364 +0,0 @@
|
||||
import contextlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from zipfile import ZIP_DEFLATED, ZipFile
|
||||
|
||||
import pyogrio
|
||||
import pyogrio.raw
|
||||
from pyogrio._compat import HAS_PYPROJ
|
||||
from pyogrio.util import get_vsi_path_or_buffer, vsi_path
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
import geopandas # noqa: F401
|
||||
|
||||
has_geopandas = True
|
||||
except ImportError:
|
||||
has_geopandas = False
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def change_cwd(path):
|
||||
curdir = os.getcwd()
|
||||
os.chdir(str(path))
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
os.chdir(curdir)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
# local file paths that should be passed through as is
|
||||
("data.gpkg", "data.gpkg"),
|
||||
(Path("data.gpkg"), "data.gpkg"),
|
||||
("/home/user/data.gpkg", "/home/user/data.gpkg"),
|
||||
(r"C:\User\Documents\data.gpkg", r"C:\User\Documents\data.gpkg"),
|
||||
("file:///home/user/data.gpkg", "/home/user/data.gpkg"),
|
||||
("/home/folder # with hash/data.gpkg", "/home/folder # with hash/data.gpkg"),
|
||||
# cloud URIs
|
||||
("https://testing/data.gpkg", "/vsicurl/https://testing/data.gpkg"),
|
||||
("s3://testing/data.gpkg", "/vsis3/testing/data.gpkg"),
|
||||
("gs://testing/data.gpkg", "/vsigs/testing/data.gpkg"),
|
||||
("az://testing/data.gpkg", "/vsiaz/testing/data.gpkg"),
|
||||
("adl://testing/data.gpkg", "/vsiadls/testing/data.gpkg"),
|
||||
("adls://testing/data.gpkg", "/vsiadls/testing/data.gpkg"),
|
||||
("hdfs://testing/data.gpkg", "/vsihdfs/testing/data.gpkg"),
|
||||
("webhdfs://testing/data.gpkg", "/vsiwebhdfs/testing/data.gpkg"),
|
||||
# archives
|
||||
("zip://data.zip", "/vsizip/data.zip"),
|
||||
("tar://data.tar", "/vsitar/data.tar"),
|
||||
("gzip://data.gz", "/vsigzip/data.gz"),
|
||||
("tar://./my.tar!my.geojson", "/vsitar/./my.tar/my.geojson"),
|
||||
(
|
||||
"zip://home/data/shapefile.zip!layer.shp",
|
||||
"/vsizip/home/data/shapefile.zip/layer.shp",
|
||||
),
|
||||
# combined schemes
|
||||
("zip+s3://testing/shapefile.zip", "/vsizip/vsis3/testing/shapefile.zip"),
|
||||
(
|
||||
"zip+https://s3.amazonaws.com/testing/shapefile.zip",
|
||||
"/vsizip/vsicurl/https://s3.amazonaws.com/testing/shapefile.zip",
|
||||
),
|
||||
# auto-prefix zip files
|
||||
("test.zip", "/vsizip/test.zip"),
|
||||
("/a/b/test.zip", "/vsizip//a/b/test.zip"),
|
||||
("a/b/test.zip", "/vsizip/a/b/test.zip"),
|
||||
# archives using ! notation should be prefixed by vsizip
|
||||
("test.zip!item.shp", "/vsizip/test.zip/item.shp"),
|
||||
("test.zip!/a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
|
||||
("test.zip!a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
|
||||
("/vsizip/test.zip/a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
|
||||
("zip:///test.zip/a/b/item.shp", "/vsizip//test.zip/a/b/item.shp"),
|
||||
# auto-prefix remote zip files
|
||||
(
|
||||
"https://s3.amazonaws.com/testing/test.zip",
|
||||
"/vsizip/vsicurl/https://s3.amazonaws.com/testing/test.zip",
|
||||
),
|
||||
(
|
||||
"https://s3.amazonaws.com/testing/test.zip!/a/b/item.shp",
|
||||
"/vsizip/vsicurl/https://s3.amazonaws.com/testing/test.zip/a/b/item.shp",
|
||||
),
|
||||
("s3://testing/test.zip", "/vsizip/vsis3/testing/test.zip"),
|
||||
(
|
||||
"s3://testing/test.zip!a/b/item.shp",
|
||||
"/vsizip/vsis3/testing/test.zip/a/b/item.shp",
|
||||
),
|
||||
("/vsimem/data.gpkg", "/vsimem/data.gpkg"),
|
||||
(Path("/vsimem/data.gpkg"), "/vsimem/data.gpkg"),
|
||||
],
|
||||
)
|
||||
def test_vsi_path(path, expected):
|
||||
assert vsi_path(path) == expected
|
||||
|
||||
|
||||
def test_vsi_path_unknown():
|
||||
# unrecognized URI gets passed through as is
|
||||
assert vsi_path("s4://test/data.geojson") == "s4://test/data.geojson"
|
||||
|
||||
|
||||
def test_vsi_handling_read_functions(naturalearth_lowres_vsi):
|
||||
# test that all different read entry points have the path handling
|
||||
# (a zip:// path would otherwise fail)
|
||||
path, _ = naturalearth_lowres_vsi
|
||||
path = "zip://" + str(path)
|
||||
|
||||
result = pyogrio.raw.read(path)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(path)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(path)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_vsi_handling_read_dataframe(naturalearth_lowres_vsi):
|
||||
path, _ = naturalearth_lowres_vsi
|
||||
path = "zip://" + str(path)
|
||||
|
||||
result = pyogrio.read_dataframe(path)
|
||||
assert len(result) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_path_absolute(data_dir):
|
||||
# pathlib path
|
||||
path = data_dir / "naturalearth_lowres/naturalearth_lowres.shp"
|
||||
df = pyogrio.read_dataframe(path)
|
||||
assert len(df) == 177
|
||||
|
||||
# str path
|
||||
df = pyogrio.read_dataframe(str(path))
|
||||
assert len(df) == 177
|
||||
|
||||
|
||||
def test_path_relative(data_dir):
|
||||
path = "naturalearth_lowres/naturalearth_lowres.shp"
|
||||
|
||||
with change_cwd(data_dir):
|
||||
result = pyogrio.raw.read(path)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(path)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(path)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_path_relative_dataframe(data_dir):
|
||||
with change_cwd(data_dir):
|
||||
df = pyogrio.read_dataframe("naturalearth_lowres/naturalearth_lowres.shp")
|
||||
assert len(df) == 177
|
||||
|
||||
|
||||
def test_uri_local_file(data_dir):
|
||||
path = "file://" + str(data_dir / "naturalearth_lowres/naturalearth_lowres.shp")
|
||||
result = pyogrio.raw.read(path)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(path)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(path)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_uri_local_file_dataframe(data_dir):
|
||||
uri = "file://" + str(data_dir / "naturalearth_lowres/naturalearth_lowres.shp")
|
||||
df = pyogrio.read_dataframe(uri)
|
||||
assert len(df) == 177
|
||||
|
||||
|
||||
def test_zip_path(naturalearth_lowres_vsi):
|
||||
path, path_vsi = naturalearth_lowres_vsi
|
||||
path_zip = "zip://" + str(path)
|
||||
|
||||
# absolute zip path
|
||||
result = pyogrio.raw.read(path_zip)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(path_zip)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(path_zip)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
# absolute vsizip path
|
||||
result = pyogrio.raw.read(path_vsi)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(path_vsi)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(path_vsi)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
# relative zip path
|
||||
relative_path = "zip://" + path.name
|
||||
with change_cwd(path.parent):
|
||||
result = pyogrio.raw.read(relative_path)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(relative_path)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(relative_path)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_zip_path_dataframe(naturalearth_lowres_vsi):
|
||||
path, path_vsi = naturalearth_lowres_vsi
|
||||
path_zip = "zip://" + str(path)
|
||||
|
||||
# absolute zip path
|
||||
df = pyogrio.read_dataframe(path_zip)
|
||||
assert len(df) == 177
|
||||
|
||||
# absolute vsizip path
|
||||
df = pyogrio.read_dataframe(path_vsi)
|
||||
assert len(df) == 177
|
||||
|
||||
# relative zip path
|
||||
with change_cwd(path.parent):
|
||||
df = pyogrio.read_dataframe("zip://" + path.name)
|
||||
assert len(df) == 177
|
||||
|
||||
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_detect_zip_path(tmp_path, naturalearth_lowres):
|
||||
# create a zipfile with 2 shapefiles in a set of subdirectories
|
||||
df = pyogrio.read_dataframe(naturalearth_lowres, where="iso_a3 in ('CAN', 'PER')")
|
||||
pyogrio.write_dataframe(df.loc[df.iso_a3 == "CAN"], tmp_path / "test1.shp")
|
||||
pyogrio.write_dataframe(df.loc[df.iso_a3 == "PER"], tmp_path / "test2.shp")
|
||||
|
||||
path = tmp_path / "test.zip"
|
||||
with ZipFile(path, mode="w", compression=ZIP_DEFLATED, compresslevel=5) as out:
|
||||
for ext in ["dbf", "prj", "shp", "shx"]:
|
||||
if not HAS_PYPROJ and ext == "prj":
|
||||
continue
|
||||
|
||||
filename = f"test1.{ext}"
|
||||
out.write(tmp_path / filename, filename)
|
||||
|
||||
filename = f"test2.{ext}"
|
||||
out.write(tmp_path / filename, f"/a/b/{filename}")
|
||||
|
||||
# defaults to the first shapefile found, at lowest subdirectory
|
||||
df = pyogrio.read_dataframe(path)
|
||||
assert df.iso_a3[0] == "CAN"
|
||||
|
||||
# selecting a shapefile from within the zip requires "!"" archive specifier
|
||||
df = pyogrio.read_dataframe(f"{path}!test1.shp")
|
||||
assert df.iso_a3[0] == "CAN"
|
||||
|
||||
df = pyogrio.read_dataframe(f"{path}!/a/b/test2.shp")
|
||||
assert df.iso_a3[0] == "PER"
|
||||
|
||||
# specifying zip:// scheme should also work
|
||||
df = pyogrio.read_dataframe(f"zip://{path}!/a/b/test2.shp")
|
||||
assert df.iso_a3[0] == "PER"
|
||||
|
||||
# specifying /vsizip/ should also work but path must already be in GDAL ready
|
||||
# format without the "!"" archive specifier
|
||||
df = pyogrio.read_dataframe(f"/vsizip/{path}/a/b/test2.shp")
|
||||
assert df.iso_a3[0] == "PER"
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
def test_url():
|
||||
url = "https://raw.githubusercontent.com/geopandas/pyogrio/main/pyogrio/tests/fixtures/naturalearth_lowres/naturalearth_lowres.shp"
|
||||
|
||||
result = pyogrio.raw.read(url)
|
||||
assert len(result[2]) == 177
|
||||
|
||||
result = pyogrio.read_info(url)
|
||||
assert result["features"] == 177
|
||||
|
||||
result = pyogrio.read_bounds(url)
|
||||
assert len(result[0]) == 177
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_url_dataframe():
|
||||
url = "https://raw.githubusercontent.com/geopandas/pyogrio/main/pyogrio/tests/fixtures/naturalearth_lowres/naturalearth_lowres.shp"
|
||||
|
||||
assert len(pyogrio.read_dataframe(url)) == 177
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
def test_url_with_zip():
|
||||
url = "zip+https://s3.amazonaws.com/fiona-testing/coutwildrnp.zip"
|
||||
|
||||
result = pyogrio.raw.read(url)
|
||||
assert len(result[2]) == 67
|
||||
|
||||
result = pyogrio.read_info(url)
|
||||
assert result["features"] == 67
|
||||
|
||||
result = pyogrio.read_bounds(url)
|
||||
assert len(result[0]) == 67
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_url_with_zip_dataframe():
|
||||
url = "zip+https://s3.amazonaws.com/fiona-testing/coutwildrnp.zip"
|
||||
df = pyogrio.read_dataframe(url)
|
||||
assert len(df) == 67
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def aws_env_setup(monkeypatch):
|
||||
monkeypatch.setenv("AWS_NO_SIGN_REQUEST", "YES")
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
def test_uri_s3(aws_env_setup):
|
||||
url = "zip+s3://fiona-testing/coutwildrnp.zip"
|
||||
|
||||
result = pyogrio.raw.read(url)
|
||||
assert len(result[2]) == 67
|
||||
|
||||
result = pyogrio.read_info(url)
|
||||
assert result["features"] == 67
|
||||
|
||||
result = pyogrio.read_bounds(url)
|
||||
assert len(result[0]) == 67
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
|
||||
def test_uri_s3_dataframe(aws_env_setup):
|
||||
df = pyogrio.read_dataframe("zip+s3://fiona-testing/coutwildrnp.zip")
|
||||
assert len(df) == 67
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
(Path("/tmp/test.gpkg"), str(Path("/tmp/test.gpkg"))),
|
||||
(Path("/vsimem/test.gpkg"), "/vsimem/test.gpkg"),
|
||||
],
|
||||
)
|
||||
def test_get_vsi_path_or_buffer_obj_to_string(path, expected):
|
||||
"""Verify that get_vsi_path_or_buffer retains forward slashes in /vsimem paths.
|
||||
|
||||
The /vsimem paths should keep forward slashes for GDAL to recognize them as such.
|
||||
However, on Windows systems, forward slashes are by default replaced by backslashes,
|
||||
so this test verifies that this doesn't happen for /vsimem paths.
|
||||
"""
|
||||
assert get_vsi_path_or_buffer(path) == expected
|
||||
|
||||
|
||||
def test_get_vsi_path_or_buffer_fixtures_to_string(tmp_path):
|
||||
path = tmp_path / "test.gpkg"
|
||||
assert get_vsi_path_or_buffer(path) == str(path)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,56 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
from pyogrio import vsi_listtree, vsi_unlink
|
||||
from pyogrio.raw import read, write
|
||||
from pyogrio.util import vsimem_rmtree_toplevel
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_vsimem_rmtree_toplevel(naturalearth_lowres):
|
||||
# Prepare test data in /vsimem/
|
||||
meta, _, geometry, field_data = read(naturalearth_lowres)
|
||||
meta["spatial_index"] = False
|
||||
meta["geometry_type"] = "MultiPolygon"
|
||||
test_dir_path = Path(f"/vsimem/test/{naturalearth_lowres.stem}.gpkg")
|
||||
test_dir2_path = Path(f"/vsimem/test2/test2/{naturalearth_lowres.stem}.gpkg")
|
||||
|
||||
write(test_dir_path, geometry, field_data, **meta)
|
||||
write(test_dir2_path, geometry, field_data, **meta)
|
||||
|
||||
# Check if everything was created properly with listtree
|
||||
files = vsi_listtree("/vsimem/")
|
||||
assert test_dir_path.as_posix() in files
|
||||
assert test_dir2_path.as_posix() in files
|
||||
|
||||
# Test deleting parent dir of file in single directory
|
||||
vsimem_rmtree_toplevel(test_dir_path)
|
||||
files = vsi_listtree("/vsimem/")
|
||||
assert test_dir_path.parent.as_posix() not in files
|
||||
assert test_dir2_path.as_posix() in files
|
||||
|
||||
# Test deleting top-level dir of file in a subdirectory
|
||||
vsimem_rmtree_toplevel(test_dir2_path)
|
||||
assert test_dir2_path.as_posix() not in vsi_listtree("/vsimem/")
|
||||
|
||||
|
||||
def test_vsimem_rmtree_toplevel_error(naturalearth_lowres):
|
||||
# Prepare test data in /vsimem
|
||||
meta, _, geometry, field_data = read(naturalearth_lowres)
|
||||
meta["spatial_index"] = False
|
||||
meta["geometry_type"] = "MultiPolygon"
|
||||
test_file_path = Path(f"/vsimem/pyogrio_test_{naturalearth_lowres.stem}.gpkg")
|
||||
|
||||
write(test_file_path, geometry, field_data, **meta)
|
||||
assert test_file_path.as_posix() in vsi_listtree("/vsimem/")
|
||||
|
||||
# Deleting parent dir of non-existent file should raise an error.
|
||||
with pytest.raises(FileNotFoundError, match="Path does not exist"):
|
||||
vsimem_rmtree_toplevel("/vsimem/test/non-existent.gpkg")
|
||||
|
||||
# File should still be there
|
||||
assert test_file_path.as_posix() in vsi_listtree("/vsimem/")
|
||||
|
||||
# Cleanup.
|
||||
vsi_unlink(test_file_path)
|
||||
assert test_file_path not in vsi_listtree("/vsimem/")
|
||||
Reference in New Issue
Block a user