470 lines
21 KiB
Python

"""This module tests if the database abstraction is working correctly."""
# Standard library
from io import StringIO
from os.path import join
import sqlite3
from tempfile import TemporaryDirectory
import warnings
# Typing
from typing import cast
from typing import List
from typing import Optional
# Unit testing
from unittest.mock import patch
from unittest import skip
from unittest import skipIf
from unittest import TestCase
# Verification
from hypothesis import given
from hypothesis import HealthCheck
from hypothesis import settings
import hypothesis.strategies as st
import numpy
# Geometry
from shapely.geometry import Point
# Package under test
from pyrate.common.charts.db import to_wkb
from pyrate.common.charts import SpatialiteDatabase
from pyrate.plan.geometry import CartesianLocation
from pyrate.plan.geometry import CartesianPolygon
from pyrate.plan.geometry import Geospatial
from pyrate.plan.geometry import LocationType
from pyrate.plan.geometry import PolarGeometry
from pyrate.plan.geometry import PolarLocation
from pyrate.plan.geometry import PolarPolygon
from pyrate.plan.geometry import PolarRoute
# Flags and helpers
from pyrate.common.testing import IS_CI
from pyrate.common.testing import IS_EXTENDED_TESTING
from pyrate.common.testing import SPATIALITE_AVAILABLE
from pyrate.common.testing.strategies.geometry import location_types
from pyrate.common.testing.strategies.geometry import polar_locations
from pyrate.common.testing.strategies.geometry import polar_objects
from pyrate.common.testing.strategies.geometry import polar_polygons
from pyrate.common.testing.strategies.geometry import polar_routes_stable
# force testing this in CI to make sure it is tested regularly at least there
SKIP_IF_SPATIALITE_IS_MISSING = skipIf(
not SPATIALITE_AVAILABLE and not IS_CI, "allow spatialite to be missing and skip the tests in that case"
)
# reduce the example count since else this will take way too long, especially on the extended tests
TEST_REDUCED_COUNT = settings(
max_examples=500 if IS_EXTENDED_TESTING else 50,
deadline=1000,
suppress_health_check=(HealthCheck.too_slow,),
)
@SKIP_IF_SPATIALITE_IS_MISSING
class TestDatabase(TestCase):
"""Test for basic use of the database."""
@staticmethod
def _apply_id_if_missing(geometry: Geospatial, identifier: Optional[int]) -> None:
if geometry.identifier is None:
geometry.identifier = identifier
def test_empty_creation(self) -> None:
"""Tests whether the creation of a new database if not crashing anything."""
with SpatialiteDatabase(":memory:") as database:
self.assertEqual(len(database), database.count_geometries())
self.assertEqual(
database.count_geometries(), 0, "a freshly initialized database should contain no polygons"
)
self.assertEqual(
database.count_vertices(), 0, "a freshly initialized database should contain no vertices"
)
def test_disable_issue_create_statement(self) -> None:
"""Tests whether initializing with ``issue_create_statement=False`` fails."""
with SpatialiteDatabase(":memory:", issue_create_statement=False) as database:
with self.assertRaises(sqlite3.OperationalError):
database.count_geometries()
def test_write_invalid_geometry_no_update_exception(self) -> None:
"""Tests that an exception is raised if an invalid geometry is attempted to be written.
This tests the case where ``update=False``.
"""
with self.assertRaises(ValueError):
with SpatialiteDatabase(":memory:") as database:
point = PolarLocation(latitude=-70.2, longitude=+120.444, name="Pointy Point")
invalid_geometry = PolarPolygon([point, point, point])
database.write_geometry(invalid_geometry, update=False)
def test_write_invalid_geometry_no_update_suppressed(self) -> None:
"""Tests that NO exception is raised if an invalid geometry is attempted to be written.
This tests the case where ``update=False`` and ``raise_on_failure=False``.
"""
with self.assertRaises(UserWarning):
with warnings.catch_warnings():
warnings.simplefilter("error")
with SpatialiteDatabase(":memory:") as database:
point = PolarLocation(latitude=-70.2, longitude=+120.444, name="Pointy Point")
invalid_geometry = PolarPolygon([point, point, point])
database.write_geometry(invalid_geometry, update=False, raise_on_failure=False)
@skip("SpatialiteDatabase.write_geometry() currently does not detect it when update=True")
def test_write_invalid_geometry_with_update_exception(self) -> None:
"""Tests that an exception is raised if an invalid geometry is attempted to be written.
This tests the case where ``update=True``.
"""
with self.assertRaises(ValueError):
with SpatialiteDatabase(":memory:") as database:
point = PolarLocation(latitude=-70.2, longitude=+120.444, name="Pointy Point")
invalid_geometry = PolarPolygon([point, point, point])
database.write_geometry(invalid_geometry, update=True)
@skip("SpatialiteDatabase.write_geometry() currently does not detect it when update=True")
def test_write_invalid_geometry_with_update_suppressed(self) -> None:
"""Tests that NO exception is raised if an invalid geometry is attempted to be written.
This tests the case where ``update=True`` and ``raise_on_failure=False``.
"""
with self.assertRaises(UserWarning):
with warnings.catch_warnings():
warnings.simplefilter("error")
with SpatialiteDatabase(":memory:") as database:
point = PolarLocation(latitude=-70.2, longitude=+120.444, name="Pointy Point")
invalid_geometry = PolarPolygon([point, point, point])
database.write_geometry(invalid_geometry, update=True, raise_on_failure=False)
def test_convert_invalid_geometry_type(self) -> None:
"""Tests whether converting an unsupported geometry type raises a :class:`NotImplementedError`."""
with self.assertRaises(NotImplementedError):
# This obviously is a faulty cast, but we need it to trigger the exception
polar = cast(PolarLocation, CartesianLocation(55, 55))
to_wkb(polar)
def test_create_twice(self) -> None:
"""Tests that opening/creating/initializing a database twice does not cause any problems.
This method checks for output on stdout and stderr since sqlite will sometimes just print some
warnings instead of raising exceptions. This is a regression test.
"""
# this creates as shared database (within this process); need to pass uri=True later on
uri = "file::memory:?cache=shared"
# capture stdout & stderr
with patch("sys.stdout", new=StringIO()) as fake_stdout:
with patch("sys.stderr", new=StringIO()) as fake_stderr:
# open two databases
with SpatialiteDatabase(uri, uri=True):
with SpatialiteDatabase(uri, uri=True):
pass
# assert that nothing got printed
self.assertEqual(len(fake_stdout.getvalue()), 0)
self.assertEqual(len(fake_stderr.getvalue()), 0)
def test_clear_and_count(self) -> None:
"""Tests :meth:`~.SpatialiteDatabase.clear` and the two counting methods."""
with SpatialiteDatabase(":memory:") as database:
self.assertEqual(len(database), 0)
self.assertEqual(database.count_vertices(), 0)
poly = PolarPolygon([PolarLocation(0, 0), PolarLocation(0, 1), PolarLocation(1, 1)])
poly.identifier = 1
database.write_geometry(poly)
self.assertEqual(len(database), 1)
self.assertEqual(database.count_vertices(), len(poly.locations))
poly.identifier = 1
database.write_geometry(poly, update=True)
self.assertEqual(len(database), 1)
self.assertEqual(database.count_vertices(), len(poly.locations))
poly.identifier = 2
database.write_geometry(poly)
self.assertEqual(len(database), 2)
self.assertEqual(database.count_vertices(), len(poly.locations) * 2)
database.clear()
self.assertEqual(len(database), 0)
self.assertEqual(database.count_vertices(), 0)
def test_result_multi_geometry(self) -> None:
"""Tests whether the DB can correctly handle query results that are multi-polygons/-routes."""
# Test with both routes and polygons
for geometry_type in (PolarRoute, PolarPolygon):
with self.subTest(f"With type {geometry_type.__name__}"):
with SpatialiteDatabase(":memory:") as database:
horseshoe = numpy.array(
[
[-3.06913376, 47.50722936],
[-3.0893898, 47.52566325],
[-3.07788849, 47.54640812],
[-3.03050995, 47.55278059],
[-2.9875946, 47.54675573],
[-2.97849655, 47.53006788],
[-2.97712326, 47.51801223],
[-2.97849655, 47.50908464],
[-3.04887772, 47.50653362],
[-3.04922104, 47.51047605],
[-2.9898262, 47.51349065],
[-2.99480438, 47.54084606],
[-3.03136826, 47.54698747],
[-3.06947708, 47.54327953],
[-3.07806015, 47.52763379],
[-3.06741714, 47.51198337],
[-3.06913376, 47.50722936],
]
)
touch_point = PolarLocation(longitude=-3.0588340759277344, latitude=47.50943249496333)
database.write_geometry(geometry_type.from_numpy(horseshoe)) # type: ignore
result = list(database.read_geometries_around(touch_point, radius=3_000))
self.assertTrue(len(result) in {2, 3})
def _random_insert_and_extract_all_generic(
self, geometry: PolarGeometry, location_type: LocationType, update: bool
) -> None:
"""Tests whether inserting and then reading works in a very basic setting."""
with SpatialiteDatabase(":memory:") as database:
# it should be empty in the beginning
self.assertEqual(0, len(database))
self.assertEqual(0, database.count_geometries())
self.assertEqual(0, database.count_vertices())
# insert the polygon
try:
database.write_geometry(geometry, update=update)
except ValueError:
return # this example is corrupt, try the next one
else:
if update and len(database) == 0: # Errors cannot be checked if update==True
return # this example is corrupt, try the next one
# now the database should not be empty anymore
self.assertEqual(1, len(database))
self.assertEqual(1, database.count_geometries())
if isinstance(geometry, (PolarRoute, PolarPolygon)):
# Some repeated points might be removed, so we cannot check the exact number of vertices
self.assertGreaterEqual(len(geometry.locations), database.count_vertices())
else: # is a PolarLocation
self.assertEqual(1, database.count_vertices())
# the element should be included in "all"
read_obstacles = list(database.read_all())
self.assertEqual(1, len(read_obstacles))
# it should only be included if the type matches
all_filtered = list(database.read_all(only_location_type=location_type))
if geometry.location_type == location_type:
self.assertEqual(all_filtered, read_obstacles)
else:
self.assertEqual(len(all_filtered), 0)
# and it should be the one that we have written into the database in the first place
read_obstacles_single = read_obstacles[0]
# the id may be newly generated if polygon has the id None, so
# set it to the generated one for the sake of equality testing
TestDatabase._apply_id_if_missing(geometry, read_obstacles_single.identifier)
assert isinstance(read_obstacles_single, type(geometry)) # Make mypy understand this check
if isinstance(geometry, PolarPolygon):
self.assertTrue(geometry.almost_congruent(read_obstacles_single)) # type: ignore
else:
self.assertTrue(geometry.equals_exact(read_obstacles_single, tolerance=1e-3))
@given(polar_locations(), location_types(), st.booleans())
@TEST_REDUCED_COUNT
def test_random_insert_and_extract_all_locations(
self, geometry: PolarGeometry, location_type: LocationType, update: bool
) -> None:
"""Basic test with locations."""
self._random_insert_and_extract_all_generic(geometry, location_type, update)
@given(polar_routes_stable(), location_types(), st.booleans())
@TEST_REDUCED_COUNT
def test_random_insert_and_extract_all_routes(
self, geometry: PolarGeometry, location_type: LocationType, update: bool
) -> None:
"""Basic test with routes."""
self._random_insert_and_extract_all_generic(geometry, location_type, update)
@given(polar_polygons(), location_types(), st.booleans())
@TEST_REDUCED_COUNT
def test_random_insert_and_extract_all_polygons(
self, geometry: PolarGeometry, location_type: LocationType, update: bool
) -> None:
"""Basic test with polygons."""
self._random_insert_and_extract_all_generic(geometry, location_type, update)
def test_copy_to_other_database(self) -> None:
"""Tests whether database can be copied.
This test is not performed using hypothesis as it takes too long.
"""
location_1 = PolarLocation(latitude=-76.400, longitude=-171.924)
location_2 = PolarLocation(latitude=-70.400, longitude=-171.924)
location_3 = PolarLocation(latitude=-76.400, longitude=-170.924)
polygons = [
PolarPolygon(
locations=[location_1, location_2, location_3],
name="K",
identifier=1234145,
location_type=LocationType.SHALLOW_WATER,
),
PolarPolygon(locations=[location_1, location_2, location_3], identifier=2342),
]
with TemporaryDirectory() as directory_name:
database_file_name = join(directory_name, "other_db.sqlite")
with SpatialiteDatabase(":memory:") as first_database:
with first_database.disable_synchronization():
first_database.write_geometries(polygons)
first_database.copy_contents_to_database(database_file_name)
with SpatialiteDatabase(database_file_name) as second_database:
read = list(second_database.read_all())
def sorter(geometry: Geospatial) -> int:
return geometry.identifier or -1
read = list(sorted(read, key=sorter))
polygons = list(sorted(polygons, key=sorter))
self.assertEqual(len(read), len(polygons))
for polygon_a, polygon_b in zip(read, polygons):
assert isinstance(polygon_a, PolarPolygon) # Make mypy understand this check
self.assertTrue(polygon_a.equals_almost_congruent(polygon_b, rel_tolerance=1e-15))
@settings(max_examples=50 if IS_EXTENDED_TESTING else 10, suppress_health_check=(HealthCheck.too_slow,))
@given(st.lists(polar_objects(), min_size=0, max_size=3))
def test_database_simplification_zero_tolerance(self, geometries: List[PolarGeometry]) -> None:
"""Tests whether database objects can be simplified."""
for index, geometry in enumerate(geometries):
geometry.identifier = index
with SpatialiteDatabase(":memory:") as database:
try:
database.write_geometries(geometries)
except ValueError:
return # this example is corrupt, try the next one
database.simplify_contents(0.0)
read = list(database.read_all())
# only one polygon should be returned
self.assertEqual(len(read), len(geometries))
for original, read_back in zip(geometries, read):
self.assertTrue(original.equals_exact(read_back, tolerance=1e-3))
def test_database_simplification(self) -> None:
"""Tests whether database objects can be simplified."""
# set unique identifiers
points = numpy.array(Point(0, 0).buffer(10_000, resolution=16).exterior.coords)
cartesian_polygon = CartesianPolygon.from_numpy(points, identifier=10042)
polygon = cartesian_polygon.to_polar(origin=PolarLocation(-50.111, -30))
self.assertEqual(len(polygon.locations), 65)
with SpatialiteDatabase(":memory:") as database:
database.write_geometry(polygon)
database.simplify_contents(100)
read = list(database.read_all())
# only one polygon should be returned
self.assertEqual(len(read), 1)
read_polygon = read[0]
assert isinstance(read_polygon, PolarPolygon) # Make mypy understand this check
# That polygon should be similar
self.assertEqual(33, len(read_polygon.locations))
self.assertAlmostEqual(read_polygon.area, polygon.area, places=-3)
self.assertTrue(read_polygon.equals_almost_congruent(polygon, rel_tolerance=0.01))
# else, it should be the same
polygon.locations = read_polygon.locations
self.assertEqual(polygon, read_polygon)
@SKIP_IF_SPATIALITE_IS_MISSING
class TestDatabaseReadAroundHandcrafted(TestCase):
"""Tests :meth:`pyrate.common.charts.SpatialiteDatabase.read_obstacles_around` with known examples."""
included_polygon = PolarPolygon(
[PolarLocation(1, 1), PolarLocation(1, -1), PolarLocation(-1, -1), PolarLocation(-1, 1)], identifier=1
)
excluded_polygon = PolarPolygon(
[PolarLocation(56, 170), PolarLocation(56.5, 170), PolarLocation(56, 170.2)], identifier=2
)
both_polygons = [included_polygon, excluded_polygon]
def setUp(self) -> None:
self.database = SpatialiteDatabase(":memory:")
self.database.write_geometries(self.both_polygons)
super().setUp()
def tearDown(self) -> None:
self.database.close()
super().tearDown()
def test_read_around_includes_full(self) -> None:
"""Reads a complete polygon."""
around = list(self.database.read_geometries_around(PolarLocation(0, 2), radius=500_000))
self.assertEqual(1, len(around))
self.assertEqual(1, len(around))
read = around[0]
assert isinstance(read, PolarPolygon) # Make mypy understand this check
self.assertTrue(self.included_polygon.almost_congruent(read))
def test_read_around_includes_nothing(self) -> None:
"""Tests that :meth:`pyrate.common.charts.SpatialiteDatabase.read_obstacles_around` works correctly.
Reads nothing.
"""
around = list(self.database.read_geometries_around(PolarLocation(0, 2), radius=0))
self.assertEqual(0, len(around))
def test_read_around_includes_partial(self) -> None:
"""Tests that :meth:`pyrate.common.charts.SpatialiteDatabase.read_obstacles_around` works correctly.
Reads about half of the polygon.
"""
query_point = PolarLocation(latitude=0.0, longitude=5.0)
# this is the distance to center of self.included_polygon
radius = query_point.distance(PolarLocation(0.0, 0.0))
read = list(self.database.read_geometries_around(query_point, radius=radius))
self.assertEqual(1, len(read))
read_polygon = read[0]
assert isinstance(read_polygon, PolarPolygon) # Make mypy understand this check
# these shall only be very roughly similar, as half of the polygon is missing
# thus, we allow for a very large relative tolerance
self.assertTrue(read_polygon.equals_almost_congruent(self.included_polygon, rel_tolerance=0.6))
# this is roughly the part that should be included in the result
eastern_half = PolarPolygon(
[PolarLocation(1, 0), PolarLocation(-1, 0), PolarLocation(-1, 1), PolarLocation(1, 1)]
)
# thus, we allow for less relative tolerance
self.assertTrue(read_polygon.almost_congruent(eastern_half, rel_tolerance=0.15))