aki_prj23_transparenzregister/tests/utils/data_transfer_test.py

745 lines
25 KiB
Python

"""Test the transfer functions from mongodb to sql."""
import random
import string
from datetime import date
from typing import Any
import numpy as np
import pandas as pd
import pytest
import sqlalchemy as sa
from pytest_mock import MockerFixture
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from aki_prj23_transparenzregister.utils import data_transfer
@pytest.mark.parametrize(
("original", "expected"),
[
(
{"name": "Amtsgericht Herne", "city": "Herne"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": ""},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": None},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": "Something Wrong"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": "NoName"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
],
)
def test_refine_district_court_entry(original: dict, expected: dict) -> None:
"""Tests the transformation/the cleaning of the district court entry."""
assert data_transfer._refine_district_court_entry(
**{"name": "Amtsgericht Herne", "city": "Herne"}
) == tuple(expected.values())
@pytest.mark.parametrize(
"defect_data",
[
{"name": "Wrong Herne", "city": "Herne"},
{"name": "Wrong Herne", "city": "NoName"},
{"city": "Herne", "name": None},
{"city": "Herne", "name": ""},
],
)
def test_refine_district_court_entry_defect_data(defect_data: dict[str, str]) -> None:
"""Tests if an error is thrown if the district court data can't be corrected."""
with pytest.raises(data_transfer.DataInvalidError):
data_transfer._refine_district_court_entry(**defect_data)
@pytest.mark.repeat(3)
def test_empty_db_fixture(empty_db: Session) -> None:
"""Checks if the db can be created."""
assert isinstance(empty_db, Session)
@pytest.mark.parametrize(
("name", "city", "id"),
[
("Amtsgericht Bochum", "Bochum", 1),
("Amtsgericht Dortmund", "Dortmund", 2),
("Amtsgericht Iserlohn", "Iserlohn", None),
],
)
def test__read_district_court_id(
name: str, city: str, id: int | None, full_db: Session
) -> None:
"""Tests if the district court id can be read."""
assert data_transfer._read_district_court_id(name, city, full_db) == id
@pytest.mark.parametrize(
("firstname", "surname", "date_str", "id"),
[
("Max", "Mustermann", "2023-01-01", 1),
("Sabine", "Mustermann", "2023-01-01", 2),
("Some Firstname", "Some Surname", "2023-01-01", 3),
("Some Firstname", "Some Surname", "2023-01-02", 4),
("Other Firstname", "Other Surname", "2023-01-02", 5),
(None, "Other Surname", "2023-01-02", None),
("Does not exist", "Other Surname", "2023-01-02", None),
("Other Firstname", "Does not exists", "2023-01-02", None),
("Other Firstname", "Other Surname", "1900-01-02", None),
("Other Firstname", None, "2023-01-02", None),
],
)
def test__read_person_id(
firstname: str, surname: str, date_str: str, id: int | None, full_db: Session
) -> None:
"""Tests if the person id can be read."""
assert (
data_transfer._read_person_id(
firstname, surname, date.fromisoformat(date_str), full_db
)
== id
)
@pytest.mark.parametrize(
("name", "city", "id"),
[
("Amtsgericht Bochum", "Bochum", 1),
("Amtsgericht Dortmund", "Dortmund", 2),
("Amtsgericht Iserlohn", "Iserlohn", 3),
("Amtsgericht Köln", "Köln", 3),
],
)
def test_get_district_court_id(name: str, city: str, id: int, full_db: Session) -> None:
"""Tests if a court id can be returned and the court automatically be added if not yet part of the db."""
assert data_transfer.get_district_court_id(name, city, full_db) == id
@pytest.mark.parametrize(
("firstname", "surname", "date_str", "id"),
[
("Max", "Mustermann", "2023-01-01", 1),
("Sabine", "Mustermann", "2023-01-01", 2),
("Some Firstname", "Some Surname", "2023-01-01", 3),
("Some Firstname", "Some Surname", "2023-01-02", 4),
("Other Firstname", "Other Surname", "2023-01-02", 5),
("Does not exist", "Other Surname", "2023-01-02", 6),
("Other Firstname", "Does not exists", "2023-01-02", 6),
("Other Firstname", "Other Surname", "1900-01-02", 6),
],
)
def test_get_person_id(
firstname: str, surname: str, date_str: str, id: int, full_db: Session
) -> None:
"""Tests if a person id can be returned and the court automatically be added if not yet part of the db."""
assert (
data_transfer.get_person_id(
firstname, surname, date.fromisoformat(date_str), full_db
)
== id
)
@pytest.mark.parametrize(
("firstname", "surname", "date_str"),
[
("", "Other Surname", "2023-01-02"),
("Other Firstname", "", "2023-01-02"),
("Other Firstname", "Other Surname", ""),
],
)
def test_get_person_id_value_check(
firstname: str, surname: str, date_str: str | None, full_db: Session
) -> None:
"""Tests if errors on adding persons can be found."""
with pytest.raises(
data_transfer.DataInvalidError, match="At least one of the three values name:"
):
data_transfer.get_person_id(
firstname,
surname,
date.fromisoformat(date_str) if date_str else None, # type: ignore
full_db,
)
@pytest.mark.parametrize(
("name", "zip_code", "city", "id"),
[
("Some Company GmbH", "", "", 1),
("Some Company GmbH", "12345", "", 1),
("Some Company GmbH", "12345", "TV City", 1),
("Some Company GmbH", "", "TV City", 1),
("Other Company GmbH", "", "", 2),
("Other Company GmbH", "12345", "", 2),
("Other Company GmbH", "12345", "TV City", 2),
("Other Company GmbH", "", "TV City", 2),
("Third Company GmbH", "", "", 3),
],
)
def test_get_company_id(
name: str, zip_code: str, city: str, id: int | None, full_db: Session
) -> None:
"""Tests if the company id can be returned correctly."""
assert data_transfer.get_company_id(name, zip_code, city, full_db) == id
@pytest.mark.parametrize(
("name", "zip_code", "city"),
[
("Does not exist", "", ""),
("Does not exist", "41265", ""),
("Does not exist", "", "Some City"),
("Other Company GmbH", "TV City", "54321"),
("Other Company GmbH", "OtherCity", "12345"),
("Other Company GmbH", "OtherCity", "54321"),
],
)
def test_get_company_id_not_found(
name: str,
zip_code: str,
city: str,
full_db: Session,
) -> None:
"""Test the accessing of missing companies."""
with pytest.raises(KeyError):
data_transfer.get_company_id(name, zip_code, city, full_db)
@pytest.mark.parametrize("name", ["", None])
def test_get_company_id_nameless(name: str | None, full_db: Session) -> None:
"""Test accessing a company without valid name."""
with pytest.raises(data_transfer.DataInvalidError):
data_transfer.get_company_id(name, "zip_code", "city", full_db) # type: ignore
def get_random_string(length: int) -> str:
"""Creates a random string of a defined length.
Args:
length: The length of the string to generate.
Returns:
The generated string.
"""
letters = string.digits + string.ascii_letters + " "
return "".join(random.choice(letters) for _ in range(length))
def get_random_zip() -> str:
"""Creates a random zip."""
letters = string.digits
return "".join(random.choice(letters) for _ in range(5))
def company_generator(seed: int) -> dict[str, Any]:
"""Generates a random company entry."""
random.seed(seed)
if random.choice([True, False]):
city = "Dortmund"
else:
city = get_random_string(random.randint(5, 30))
return {
"id": {
"district_court": {
"name": f"Amtsgericht {city}",
"city": city if random.choice([True, False]) else None,
},
"hr_number": get_random_string(7),
},
"name": get_random_string(random.randint(3, 150)),
"location": {
"city": city if random.choice([True, False]) else None,
"zip_code": get_random_zip() if random.choice([True, False]) else None,
"street": get_random_string(20) if random.choice([True, False]) else None,
},
"last_update": date(random.randint(2000, 2023), 1, 1),
}
@pytest.mark.parametrize("seed", list(range(70, 75)))
def test_add_company(seed: int, full_db: Session) -> None:
"""Tests the addition of a company to the db."""
company = company_generator(seed)
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_name(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests what happens if a company has a broken / empty name."""
company = company_generator(seed)
company["name"] = overwrite
if overwrite is None:
with pytest.raises(
data_transfer.DataInvalidError,
match="The company name needs to be valid ",
):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_city(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests a broken / empty city entry."""
company = company_generator(seed)
company["location"]["city"] = overwrite
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_zip_code(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests how to add a company if the zip_code is broken / empty."""
company = company_generator(seed)
company["location"]["zip_code"] = overwrite
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", [None])
def test_add_company_broken_date(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests how the company dadd function deals with a missing date."""
company = company_generator(seed)
company["last_update"] = overwrite
with pytest.raises(sa.exc.IntegrityError):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_district_court(
seed: int, overwrite: str | None, full_db: Session, mocker: MockerFixture
) -> None:
"""Test a broken district court entry."""
company = company_generator(seed)
company["id"]["district_court"]["name"] = overwrite
company["id"]["district_court"]["city"] = get_random_string(10)
with pytest.raises(
data_transfer.DataInvalidError,
match="There is no court name|The name of the district court does not start correctly",
):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(0, 25, 5)))
def test_add_companies(seed: int, mocker: MockerFixture, full_db: Session) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(1, 30)).tolist()
)
]
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_called_once_with("When adding companies no problems occurred.")
spy_warning.assert_not_called()
assert spy_debug.call_count == len(companies)
@pytest.mark.parametrize("seed", list(range(1, 25, 5)))
def test_add_companies_duplicate(
seed: int, mocker: MockerFixture, full_db: Session
) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(4, 30)).tolist()
)
]
unique_companies = len(companies)
companies += companies[-3:]
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_not_called()
spy_warning.assert_called_once_with(
"When adding companies 3 problems occurred 0 where caused by invalid data."
)
assert spy_debug.call_count == unique_companies
@pytest.mark.parametrize("seed", list(range(2, 25, 5)))
def test_add_companies_corrupted_data(
seed: int, mocker: MockerFixture, full_db: Session
) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(4, 30)).tolist()
)
]
companies[len(companies) // 2]["name"] = ""
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_not_called()
spy_warning.assert_called_once_with(
"When adding companies 1 problems occurred 1 where caused by invalid data."
)
assert spy_debug.call_count == len(companies) - 1
@pytest.mark.parametrize("company_id", list(range(5)))
def test_add_relationship_no_relation(company_id: int, full_db: Session) -> None:
"""Tests if an error is thrown if the relation type/role is not defined."""
with pytest.raises(ValueError, match="A relation type needs to be given."):
data_transfer.add_relationship({}, company_id, full_db)
@pytest.mark.parametrize("company_id", list(range(5)))
def test_add_relationship_unknown_relation(company_id: int, full_db: Session) -> None:
"""Tests if an error is thrown if the relation type/role is unknown."""
with pytest.raises(ValueError, match="Relation type .* is not yet implemented!"):
data_transfer.add_relationship(
{"role": "something strange"}, company_id, full_db
)
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
("firstname", "surname", "date_of_birth"),
[
("Max", "Mustermann", "2023-01-01"),
("Some Firstname", "Some Surname", "2023-01-01"),
("Other Firstname", "Other Surname", "1900-01-02"),
],
)
@pytest.mark.parametrize("role", ["Partner", "direktor", "liquidator"])
def test_add_relationship_person( # noqa: PLR0913
firstname: str,
surname: str,
date_of_birth: str,
full_db: Session,
company_id: int,
role: str,
) -> None:
"""Tests if a personal relation can be added."""
relation = {
"name": {
"firstname": firstname,
"lastname": surname,
},
"date_of_birth": date.fromisoformat(date_of_birth),
"role": role,
}
data_transfer.add_relationship(relation, company_id, full_db)
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
("firstname", "surname", "date_of_birth"),
[
("Max", None, "2023-01-01"),
(None, "Some Surname", "2023-01-01"),
("Other Firstname", "Other Surname", None),
],
)
@pytest.mark.parametrize("role", ["Partner"])
def test_add_relationship_person_missing_data( # noqa: PLR0913
firstname: str,
surname: str,
date_of_birth: str,
full_db: Session,
company_id: int,
role: str,
mocker: MockerFixture,
) -> None:
"""Tests if a personal relation can be added."""
mocker.spy(data_transfer.logger, "warning")
relation = {
"name": {
"firstname": firstname,
"lastname": surname,
},
"date_of_birth": date_of_birth if date_of_birth else None,
"role": role,
}
with pytest.raises(
data_transfer.DataInvalidError, match="At least one of the three values name:"
):
data_transfer.add_relationship(relation, company_id, full_db)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Some Company GmbH", None, None, 2),
("Some Company GmbH", None, "12345", 2),
("Some Company GmbH", "TV City", None, 3),
("Some Company GmbH", "TV City", "12345", 2),
("Some Company GmbH", "Strange City", "12345", 2),
("Some Company GmbH", "TV City", "?????", 2),
("Third Company GmbH", None, None, 1),
],
)
def test_add_relationship_company(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
) -> None:
"""Tests if a relationship to another company can be added."""
data_transfer.add_relationship(
{
"description": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
},
company_id,
full_db,
)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Some Company GmbH", None, None, 1),
("Some Company GmbH", "TV City", "12345", 1),
("Some Company GmbH", "TV City", None, 1),
("Third Company GmbH", None, None, 3),
],
)
def test_add_relationship_company_self_reference(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
) -> None:
"""Tests if a company referencing a relationship with itself throws an error."""
with pytest.raises(
data_transfer.DataInvalidError,
match="For a valid relation both parties can't be the same entity.",
):
data_transfer.add_relationship(
{
"description": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
},
company_id,
full_db,
)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Unknown GmbH", None, None, 2),
("Some Company GmbH", "Strange city", "?????", 2),
],
)
def test_add_relationship_company_unknown(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
) -> None:
"""Tests if a relationship to another company can be added."""
with pytest.raises(
KeyError, match=f"No corresponding company could be found to {company_name}."
):
data_transfer.add_relationship(
{
"description": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
},
company_id,
full_db,
)
@pytest.mark.parametrize("empty_relations", [[], [{}], [{"relationship": []}]])
def test_add_relationships_none(empty_relations: list, full_db: Session) -> None:
"""Testing what happens if an empty relation is added."""
data_transfer.add_relationships([], full_db)
@pytest.mark.working_on()
@pytest.mark.parametrize(
"documents",
[
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [
{
"name": {"firstname": "Second person", "lastname": "Köstser"},
"date_of_birth": "1961-02-09",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
},
{
"name": {"firstname": "First Person", "lastname": "Jifpa"},
"date_of_birth": "1976-04-20",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
},
{
"name": {"firstname": "", "lastname": "Jiapa"},
"date_of_birth": "1976-04-20",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
},
{
"name": {"firstname": "Something", "lastname": ""},
"date_of_birth": "12i3u",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
},
{
"name": {"firstname": "First Person", "lastname": "Jipha"},
"date_of_birth": "1976-04-20",
},
],
"yearly_results": {},
}
]
],
)
def test_relationships(documents: list[dict[str, Any]], full_db: Session) -> None:
"""Testing to add lots of relations."""
data_transfer.add_relationships(documents, full_db)
bind = full_db.bind
assert isinstance(bind, Engine)
pd.testing.assert_frame_equal(
pd.read_sql_table("company", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2, 2: 3},
"hr": {0: "HRB 123", 1: "HRB 123", 2: "HRB 12"},
"court_id": {0: 2, 1: 1, 2: 2},
"name": {
0: "Some Company GmbH",
1: "Other Company GmbH",
2: "Third Company GmbH",
},
"street": {0: "Sesamstr.", 1: "Sesamstr.", 2: None},
"zip_code": {0: "12345", 1: "12345", 2: None},
"city": {0: "TV City", 1: "TV City", 2: None},
"last_update": {
0: pd.Timestamp("2023-01-01 00:00:00"),
1: pd.Timestamp("2023-01-01 00:00:00"),
2: pd.Timestamp("2023-01-01 00:00:00"),
},
"sector": {0: None, 1: None, 2: None},
}
),
)
assert len(pd.read_sql_table("company_relation", bind).index) == 0
pd.testing.assert_frame_equal(
pd.read_sql_table("person_relation", bind),
pd.DataFrame({"id": {0: 1, 1: 2}, "person_id": {0: 6, 1: 7}}),
)
pd.testing.assert_frame_equal(
pd.read_sql_table("relation", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2},
"company_id": {0: 1, 1: 1},
"date_from": {0: pd.NaT, 1: pd.NaT},
"date_to": {0: pd.NaT, 1: pd.NaT},
"relation": {0: "GESCHAEFTSFUEHRER", 1: "GESCHAEFTSFUEHRER"},
}
),
)
pd.testing.assert_frame_equal(
pd.read_sql_table("person", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7},
"name": {
0: "Max",
1: "Sabine",
2: "Some Firstname",
3: "Some Firstname",
4: "Other Firstname",
5: "Second person",
6: "First Person",
},
"surname": {
0: "Mustermann",
1: "Mustermann",
2: "Some Surname",
3: "Some Surname",
4: "Other Surname",
5: "Köstser",
6: "Jifpa",
},
"date_of_birth": {
0: pd.Timestamp("2023-01-01 00:00:00"),
1: pd.Timestamp("2023-01-01 00:00:00"),
2: pd.Timestamp("2023-01-01 00:00:00"),
3: pd.Timestamp("2023-01-02 00:00:00"),
4: pd.Timestamp("2023-01-02 00:00:00"),
5: pd.Timestamp("1961-02-09 00:00:00"),
6: pd.Timestamp("1976-04-20 00:00:00"),
},
"works_for": {
0: None,
1: None,
2: None,
3: None,
4: None,
5: None,
6: None,
},
}
),
)