aki_prj23_transparenzregister/tests/utils/data_transfer_test.py
Philipp Horstenkamp f8c111d7e2
Resolve mismatch between staging and prod db data for financials (#211)
SQL Creation is now done dynamicly by the definition of the enumeration
type.
2023-10-14 17:16:14 +02:00

1184 lines
40 KiB
Python

"""Test the transfer functions from mongodb to sql."""
import random
import string
import sys
from datetime import date
from typing import Any
import numpy as np
import pandas as pd
import pytest
import sqlalchemy as sa
from _pytest.monkeypatch import MonkeyPatch
from pytest_mock import MockerFixture
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from aki_prj23_transparenzregister.models.company import (
CapitalTypeEnum,
CompanyRelationshipEnum,
CompanyTypeEnum,
CurrencyEnum,
)
from aki_prj23_transparenzregister.utils import data_transfer
from aki_prj23_transparenzregister.utils.data_transfer import CompanyNotFoundError
from aki_prj23_transparenzregister.utils.sql import entities
@pytest.mark.parametrize(
("original", "expected"),
[
(
{"name": "Amtsgericht Herne", "city": "Herne"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": ""},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": None},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": "Something Wrong"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
(
{"name": "Amtsgericht Herne", "city": "NoName"},
{"name": "Amtsgericht Herne", "city": "Herne"},
),
],
)
def test_refine_district_court_entry(original: dict, expected: dict) -> None:
"""Tests the transformation/the cleaning of the district court entry."""
assert data_transfer._refine_district_court_entry(
**{"name": "Amtsgericht Herne", "city": "Herne"}
) == tuple(expected.values())
@pytest.mark.parametrize(
"defect_data",
[
{"name": "Wrong Herne", "city": "Herne"},
{"name": "Wrong Herne", "city": "NoName"},
{"city": "Herne", "name": None},
{"city": "Herne", "name": ""},
],
)
def test_refine_district_court_entry_defect_data(defect_data: dict[str, str]) -> None:
"""Tests if an error is thrown if the district court data can't be corrected."""
with pytest.raises(data_transfer.DataInvalidError):
data_transfer._refine_district_court_entry(**defect_data)
@pytest.mark.repeat(3)
def test_empty_db_fixture(empty_db: Session) -> None:
"""Checks if the db can be created."""
assert isinstance(empty_db, Session)
@pytest.mark.parametrize(
("name", "city", "id"),
[
("Amtsgericht Bochum", "Bochum", 1),
("Amtsgericht Dortmund", "Dortmund", 2),
("Amtsgericht Iserlohn", "Iserlohn", None),
],
)
def test__read_district_court_id(
name: str, city: str, id: int | None, full_db: Session
) -> None:
"""Tests if the district court id can be read."""
assert data_transfer._read_district_court_id(name, city, full_db) == id
@pytest.mark.parametrize(
("firstname", "surname", "date_str", "id"),
[
("Max", "Mustermann", "2023-01-01", 1),
("Sabine", "Mustermann", "2023-01-01", 2),
("Some Firstname", "Some Surname", "2023-01-01", 3),
("Some Firstname", "Some Surname", "2023-01-02", 4),
("Other Firstname", "Other Surname", "2023-01-02", 5),
(None, "Other Surname", "2023-01-02", None),
("Does not exist", "Other Surname", "2023-01-02", None),
("Other Firstname", "Does not exists", "2023-01-02", None),
("Other Firstname", "Other Surname", "1900-01-02", None),
("Other Firstname", None, "2023-01-02", None),
],
)
def test__read_person_id(
firstname: str, surname: str, date_str: str, id: int | None, full_db: Session
) -> None:
"""Tests if the person id can be read."""
assert (
data_transfer._read_person_id(
firstname, surname, date.fromisoformat(date_str), full_db
)
== id
)
@pytest.mark.parametrize(
("name", "city", "id"),
[
("Amtsgericht Bochum", "Bochum", 1),
("Amtsgericht Dortmund", "Dortmund", 2),
("Amtsgericht Iserlohn", "Iserlohn", 3),
("Amtsgericht Köln", "Köln", 3),
],
)
def test_get_district_court_id(name: str, city: str, id: int, full_db: Session) -> None:
"""Tests if a court id can be returned and the court automatically be added if not yet part of the db."""
assert data_transfer.get_district_court_id(name, city, full_db) == id
@pytest.mark.parametrize(
("firstname", "surname", "date_str", "id"),
[
("Max", "Mustermann", "2023-01-01", 1),
("Sabine", "Mustermann", "2023-01-01", 2),
("Some Firstname", "Some Surname", "2023-01-01", 3),
("Some Firstname", "Some Surname", "2023-01-02", 4),
("Other Firstname", "Other Surname", "2023-01-02", 5),
("Does not exist", "Other Surname", "2023-01-02", 6),
("Other Firstname", "Does not exists", "2023-01-02", 6),
("Other Firstname", "Other Surname", "1900-01-02", 6),
],
)
def test_get_person_id(
firstname: str, surname: str, date_str: str, id: int, full_db: Session
) -> None:
"""Tests if a person id can be returned and the court automatically be added if not yet part of the db."""
assert (
data_transfer.get_person_id(
firstname, surname, date.fromisoformat(date_str), full_db
)
== id
)
@pytest.mark.parametrize(
("firstname", "surname", "date_str"),
[
("", "Other Surname", "2023-01-02"),
("Other Firstname", "", "2023-01-02"),
("Other Firstname", "Other Surname", ""),
],
)
def test_get_person_id_value_check(
firstname: str, surname: str, date_str: str | None, full_db: Session
) -> None:
"""Tests if errors on adding persons can be found."""
with pytest.raises(
data_transfer.DataInvalidError, match="At least one of the three values name:"
):
data_transfer.get_person_id(
firstname,
surname,
date.fromisoformat(date_str) if date_str else None,
full_db,
)
@pytest.mark.parametrize(
("name", "zip_code", "city", "id"),
[
("Some Company GmbH", "", "", 1),
("Some Company GmbH", "58644", "", 1),
("Some Company GmbH", "58644", "TV City", 1),
("Some Company GmbH", "", "TV City", 1),
("Other Company GmbH", "", "", 2),
("Other Company GmbH", "58636", "", 2),
("Other Company GmbH", "58636", "TV City", 2),
("Other Company GmbH", "", "TV City", 2),
("Third Company GmbH", "", "", 3),
],
)
def test_get_company_id(
name: str, zip_code: str, city: str, id: int | None, full_db: Session
) -> None:
"""Tests if the company id can be returned correctly."""
assert data_transfer.get_company_id(name, zip_code, city, full_db) == id
@pytest.mark.parametrize(
("name", "zip_code", "city"),
[
("Does not exist", "", ""),
("Does not exist", "41265", ""),
("Does not exist", "", "Some City"),
("Other Company GmbH", "TV City", "54321"),
("Other Company GmbH", "OtherCity", "12345"),
("Other Company GmbH", "OtherCity", "54321"),
],
)
def test_get_company_id_not_found(
name: str,
zip_code: str,
city: str,
full_db: Session,
) -> None:
"""Test the accessing of missing companies."""
with pytest.raises(CompanyNotFoundError):
data_transfer.get_company_id(name, zip_code, city, full_db)
@pytest.mark.parametrize("name", ["", None])
def test_get_company_id_nameless(name: str | None, full_db: Session) -> None:
"""Test accessing a company without valid name."""
with pytest.raises(data_transfer.DataInvalidError):
data_transfer.get_company_id(name, "zip_code", "city", full_db) # type: ignore
def get_random_string(length: int) -> str:
"""Creates a random string of a defined length.
Args:
length: The length of the string to generate.
Returns:
The generated string.
"""
letters = string.digits + string.ascii_letters + " "
return "".join(random.choice(letters) for _ in range(length))
def get_random_zip() -> str:
"""Creates a random zip."""
letters = string.digits
return "".join(random.choice(letters) for _ in range(5))
def company_generator(seed: int) -> dict[str, Any]:
"""Generates a random company entry."""
random.seed(seed)
if random.choice([True, False]):
city = "Dortmund"
else:
city = get_random_string(random.randint(5, 30))
return {
"id": {
"district_court": {
"name": f"Amtsgericht {city}",
"city": city if random.choice([True, False]) else None,
},
"hr_number": get_random_string(7),
},
"name": get_random_string(random.randint(3, 150)),
"location": {
"city": city if random.choice([True, False]) else None,
"zip_code": get_random_zip() if random.choice([True, False]) else None,
"street": get_random_string(20) if random.choice([True, False]) else None,
},
"capital": random.choice(
[
{},
None,
{
"value": random.randint(1000, 10000000),
"currency": random.choice(["DM", "EUR"]),
"type": random.choice(list(CapitalTypeEnum)),
},
]
),
"last_update": date(random.randint(2000, 2023), 1, 1).isoformat(),
"company_type": random.choice(list(CompanyTypeEnum) + [None]), # type: ignore
"founding_date": date(
random.randint(2000, 2023), random.randint(1, 12), random.randint(1, 28)
).isoformat(),
"business_purpose": random.choice(["", "Some text", None]),
}
@pytest.mark.parametrize("seed", list(range(70, 75)))
def test_add_company(seed: int, full_db: Session) -> None:
"""Tests the addition of a company to the db."""
company = company_generator(seed)
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_name(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests what happens if a company has a broken / empty name."""
company = company_generator(seed)
company["name"] = overwrite
if overwrite is None:
with pytest.raises(
data_transfer.DataInvalidError,
match="The company name needs to be valid ",
):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_city(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests a broken / empty city entry."""
company = company_generator(seed)
company["location"]["city"] = overwrite
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_zip_code(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests how to add a company if the zip_code is broken / empty."""
company = company_generator(seed)
company["location"]["zip_code"] = overwrite
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", [None])
def test_add_company_broken_date(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Tests how the company dadd function deals with a missing date."""
company = company_generator(seed)
company["last_update"] = overwrite
with pytest.raises(sa.exc.IntegrityError):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_district_court(
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Test a broken district court entry."""
company = company_generator(seed)
company["id"]["district_court"]["name"] = overwrite
company["id"]["district_court"]["city"] = get_random_string(10)
with pytest.raises(
data_transfer.DataInvalidError,
match="There is no court name|The name of the district court does not start correctly",
):
data_transfer.add_company(company, full_db)
@pytest.mark.parametrize("seed", list(range(0, 25, 5)))
def test_add_companies(seed: int, mocker: MockerFixture, full_db: Session) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(1, 30)).tolist()
)
]
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_called_once_with("When adding companies no problems occurred.")
spy_warning.assert_not_called()
assert spy_debug.call_count == len(companies)
@pytest.mark.parametrize("seed", list(range(1, 25, 5)))
def test_add_companies_duplicate(
seed: int, mocker: MockerFixture, full_db: Session
) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(4, 30)).tolist()
)
]
unique_companies = len(companies)
companies += companies[-3:]
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_not_called()
spy_warning.assert_called_once_with(
"When adding companies 3 problems occurred 0 where caused by invalid data."
)
assert spy_debug.call_count == unique_companies
@pytest.mark.parametrize("seed", list(range(2, 25, 5)))
def test_add_companies_corrupted_data(
seed: int, mocker: MockerFixture, full_db: Session
) -> None:
"""Test to add multiple companies."""
rnd_generator = np.random.default_rng(seed)
companies: list[dict[str, Any]] = [
company_generator(_)
for _ in set(
rnd_generator.integers(0, 1000, size=rnd_generator.integers(4, 30)).tolist()
)
]
companies[len(companies) // 2]["name"] = ""
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
spy_debug = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_companies(companies, full_db)
spy_info.assert_not_called()
spy_warning.assert_called_once_with(
"When adding companies 1 problems occurred 1 where caused by invalid data."
)
assert spy_debug.call_count == len(companies) - 1
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
("firstname", "lastname", "date_of_birth"),
[
("Max", "Mustermann", "2023-01-01"),
("Some Firstname", "Some Surname", "2023-01-01"),
("Other Firstname", "Other Surname", "1900-01-02"),
],
)
@pytest.mark.parametrize(
"role", ["Prokurist(in)", "Geschäftsführer(in)", "Geschäftsführer"]
)
def test_add_relationship_person( # noqa: PLR0913
firstname: str,
lastname: str,
date_of_birth: str,
full_db: Session,
company_id: int,
role: str,
) -> None:
"""Tests if a personal relation can be added."""
relation = {
"name": {
"firstname": firstname,
"lastname": lastname,
},
"type": CompanyRelationshipEnum.PERSON.value,
"date_of_birth": date.fromisoformat(date_of_birth),
"role": role,
}
data_transfer.add_relationship(relation, company_id, full_db)
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
("firstname", "surname", "date_of_birth"),
[
("Max", None, "2023-01-01"),
(None, "Some Surname", "2023-01-01"),
],
)
@pytest.mark.parametrize("role", ["Partner"])
def test_add_relationship_person_missing_data( # noqa: PLR0913
firstname: str,
surname: str,
date_of_birth: str,
full_db: Session,
company_id: int,
role: str,
mocker: MockerFixture,
) -> None:
"""Tests if a personal relation can be added."""
mocker.spy(data_transfer.logger, "warning")
relation = {
"name": {
"firstname": firstname,
"lastname": surname,
},
"date_of_birth": date_of_birth if date_of_birth else None,
"role": role,
"type": CompanyRelationshipEnum.PERSON.value,
}
with pytest.raises(
data_transfer.DataInvalidError, match="At least one of the three values name:"
):
data_transfer.add_relationship(relation, company_id, full_db)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Some Company GmbH", None, None, 2),
("Some Company GmbH", None, "12345", 2),
("Some Company GmbH", "TV City", None, 3),
("Some Company GmbH", "TV City", "12345", 2),
("Some Company GmbH", "Strange City", "12345", 2),
("Some Company GmbH", "TV City", "?????", 2),
("Third Company GmbH", None, None, 1),
],
)
def test_add_relationship_company(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
) -> None:
"""Tests if a relationship to another company can be added."""
data_transfer.add_relationship(
{
"name": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
"type": CompanyRelationshipEnum.COMPANY.value,
},
company_id,
full_db,
)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Some Company GmbH", None, None, 1),
("Some Company GmbH", "TV City", "12345", 1),
("Some Company GmbH", "TV City", None, 1),
("Third Company GmbH", None, None, 3),
],
)
def test_add_relationship_company_self_reference(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
) -> None:
"""Tests if a company referencing a relationship with itself throws an error."""
with pytest.raises(
data_transfer.DataInvalidError,
match="For a valid relation both parties can't be the same entity.",
):
data_transfer.add_relationship(
{
"name": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
"type": CompanyRelationshipEnum.COMPANY.value,
},
company_id,
full_db,
)
@pytest.mark.parametrize(
("company_name", "city", "zip_code", "company_id"),
[
("Unknown GmbH", None, None, 2),
("Some Company GmbH", "Strange city", "?????", 2),
],
)
def test_add_relationship_company_unknown(
company_id: int,
company_name: str,
city: str | None,
zip_code: str | None,
full_db: Session,
mocker: MockerFixture,
) -> None:
"""Tests if a relationship to another company can be added."""
spy_debug = mocker.spy(data_transfer.logger, "debug")
spy_info = mocker.spy(data_transfer.logger, "info")
data_transfer.add_relationship(
{
"description": company_name,
"location": {
"zip_code": zip_code,
"city": city,
},
"role": "organisation",
"type": CompanyRelationshipEnum.COMPANY.value,
"name": "company name",
},
company_id,
full_db,
)
spy_debug.assert_called_once()
spy_info.assert_not_called()
@pytest.mark.parametrize("empty_relations", [[], [{}], [{"relationship": []}]])
def test_add_relationships_none(empty_relations: list, full_db: Session) -> None:
"""Testing what happens if an empty relation is added."""
data_transfer.add_relationships([], full_db)
# noinspection SpellCheckingInspection
@pytest.mark.parametrize(
"documents",
[
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [
{
"name": {"firstname": "Second person", "lastname": "Köstser"},
"date_of_birth": "1961-02-09",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
"type": CompanyRelationshipEnum.PERSON.value,
},
{
"name": {"firstname": "First Person", "lastname": "Jifpa"},
"date_of_birth": "1976-04-20",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
"type": CompanyRelationshipEnum.PERSON.value,
},
{
"name": {"firstname": "", "lastname": "Jiapa"},
"date_of_birth": "1976-04-20",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
"type": CompanyRelationshipEnum.PERSON.value,
},
{
"name": {"firstname": "Something", "lastname": ""},
"date_of_birth": "12i3u",
"location": {"city": "Stuttgart"},
"role": "Geschäftsführer",
"type": CompanyRelationshipEnum.PERSON.value,
},
{
"name": {"lastname": "Jipha"},
"date_of_birth": "1976-04-20",
"type": CompanyRelationshipEnum.PERSON.value,
},
],
"yearly_results": {},
}
]
],
)
def test_relationships(documents: list[dict[str, Any]], full_db: Session) -> None:
"""Testing to add lots of relations."""
data_transfer.add_relationships(documents, full_db)
bind = full_db.bind
assert isinstance(bind, Engine)
pd.testing.assert_frame_equal(
pd.read_sql_table("company", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2, 2: 3},
"hr": {0: "HRB 123", 1: "HRB 123", 2: "HRB 12"},
"court_id": {0: 2, 1: 1, 2: 2},
"name": {
0: "Some Company GmbH",
1: "Other Company GmbH",
2: "Third Company GmbH",
},
"company_type": {0: None, 1: None, 2: None},
"founding_date": {0: pd.Timestamp(date.fromisoformat("2010-08-07"))},
"business_purpose": {0: 'Say "Hello World"', 1: "Some purpose"},
"street": {0: "Sesamstr.", 1: "Sesamstr.", 2: None},
"house_number": {0: "4", 1: "8"},
"zip_code": {0: "58644", 1: "58636"},
"city": {0: "TV City", 1: "TV City"},
"longitude": {0: 7.6968, 1: 7.7032},
"latitude": {0: 51.3246, 1: 51.38},
"pos_accuracy": {0: 4.0, 1: 4.0},
"capital_value": {0: 1000000.0, 2: 10000.0},
"original_currency": {0: "DEUTSCHE_MARK", 2: "EURO"},
"capital_type": {0: "HAFTEINLAGE", 2: "GRUNDKAPITAL"},
"last_update": {
0: pd.Timestamp("2023-01-01 00:00:00"),
1: pd.Timestamp("2023-01-01 00:00:00"),
2: pd.Timestamp("2023-01-01 00:00:00"),
},
"sector": {2: "Electronic"},
}
),
)
assert len(pd.read_sql_table("company_relation", bind).index) == 0
pd.testing.assert_frame_equal(
pd.read_sql_table("person_relation", bind),
pd.DataFrame({"id": {0: 1, 1: 2}, "person_id": {0: 6, 1: 7}}),
)
pd.testing.assert_frame_equal(
pd.read_sql_table("relation", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2},
"company_id": {0: 1, 1: 1},
"date_from": {0: pd.NaT, 1: pd.NaT},
"date_to": {0: pd.NaT, 1: pd.NaT},
"relation": {0: "Geschäftsführer", 1: "Geschäftsführer"},
}
),
)
pd.testing.assert_frame_equal(
pd.read_sql_table("person", bind),
pd.DataFrame(
{
"id": {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7},
"firstname": {
0: "Max",
1: "Sabine",
2: "Some Firstname",
3: "Some Firstname",
4: "Other Firstname",
5: "Second person",
6: "First Person",
},
"lastname": {
0: "Mustermann",
1: "Mustermann",
2: "Some Surname",
3: "Some Surname",
4: "Other Surname",
5: "Köstser",
6: "Jifpa",
},
"date_of_birth": {
0: pd.Timestamp("2023-01-01 00:00:00"),
1: pd.Timestamp("2023-01-01 00:00:00"),
2: pd.Timestamp("2023-01-01 00:00:00"),
3: pd.Timestamp("2023-01-02 00:00:00"),
4: pd.Timestamp("2023-01-02 00:00:00"),
5: pd.Timestamp("1961-02-09 00:00:00"),
6: pd.Timestamp("1976-04-20 00:00:00"),
},
"works_for": {_: None for _ in range(7)},
}
),
)
@pytest.mark.parametrize(
"companies",
[
[],
[{}],
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {2023: {}, "2023": {}},
},
],
],
)
def test_add_annual_financial_reports_no_call(
companies: list[dict], full_db: Session, mocker: MockerFixture
) -> None:
"""Testing if financial reports are added correctly to the db."""
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
mocker.patch("aki_prj23_transparenzregister.utils.data_transfer.add_annual_report")
data_transfer.add_annual_financial_reports(companies, full_db)
input_args = mocker.call.args
input_kwargs = mocker.call.kwargs
assert len(input_args) == len(input_kwargs)
spy_warning.assert_not_called()
spy_info.assert_called_once()
@pytest.mark.parametrize(
"companies",
[
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {"i am not an int": {"auditor": {}}},
}
],
],
)
def test_add_annual_financial_reports_defect_year(
companies: list[dict], full_db: Session, mocker: MockerFixture
) -> None:
"""Testing if financial reports are added correctly to the db."""
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
mocker.patch("aki_prj23_transparenzregister.utils.data_transfer.add_annual_report")
data_transfer.add_annual_financial_reports(companies, full_db)
input_args = mocker.call.args
input_kwargs = mocker.call.kwargs
assert len(input_args) == len(input_kwargs)
spy_warning.assert_called_once()
spy_info.assert_called_once()
def test_add_annual_financial_reports(full_db: Session, mocker: MockerFixture) -> None:
"""Testing if financial reports are added correctly to the db."""
companies = [
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {
2023: {"some-text1": {}},
"cast-me-to-int": {"some-text4": {}},
2025: {"some-text2": {}},
"cast-me-too": {"some-text5": {}},
2024: {"some-text3": {}},
},
}
]
spy_warning = mocker.spy(data_transfer.logger, "warning")
spy_info = mocker.spy(data_transfer.logger, "info")
mocked = mocker.patch(
"aki_prj23_transparenzregister.utils.data_transfer.add_annual_report"
)
data_transfer.add_annual_financial_reports(companies, full_db)
spy_warning.assert_has_calls([])
for input_args in mocked.call_args_list:
assert input_args.args[0] == 1
assert isinstance(input_args.kwargs["db"], Session)
assert len(input_args.kwargs) == 1
for year, input_args in zip([2023, 2025, 2024], mocked.call_args_list, strict=True):
assert year == input_args.args[1]
report: dict
for report, input_args in zip(
[{"some-text1": {}}, {"some-text2": {}}, {"some-text3": {}}],
mocked.call_args_list,
strict=True,
):
assert report == input_args.args[2]
for input_args in mocked.call_args_list:
assert isinstance(input_args.kwargs["db"], Session)
spy_info.assert_called_once()
@pytest.mark.parametrize("year", list(range(2000, 2025, 5)))
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
"empty_report",
[{}, {"auditors": []}, {"financials": []}, {"auditors": [], "financials": []}],
)
def test_add_annual_report_empty(
year: int,
company_id: int,
empty_report: dict,
full_db: Session,
mocker: MockerFixture,
) -> None:
"""Testing if the correct warning is thrown when the financial and auditor records are empty."""
df_prior = pd.read_sql_table(
entities.AnnualFinanceStatement.__tablename__, full_db.bind # type: ignore
)
spy_warning = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_annual_report(company_id, year, empty_report, full_db)
full_db.commit()
spy_warning.assert_called_once()
pd.testing.assert_frame_equal(
df_prior,
pd.read_sql_table(entities.AnnualFinanceStatement.__tablename__, full_db.bind), # type: ignore
)
@pytest.mark.parametrize("year", [2015, 2023, 2024])
@pytest.mark.parametrize("company_id", [7, 8, 9])
@pytest.mark.parametrize(
"empty_report",
[{}, {"auditors": []}, {"financials": []}, {"auditors": [], "financials": []}],
)
def test_add_annual_report_to_unknown_company(
year: int, company_id: int, empty_report: dict, full_db: Session
) -> None:
"""Tests if an error is thrown when the company id isn't registered in the db."""
with pytest.raises(
KeyError, match="The company with the id .* could not be found."
):
data_transfer.add_annual_report(company_id, year, empty_report, full_db)
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize("year", [2023, 2025, 2020])
@pytest.mark.parametrize("short_term_debt", [2023.2, 2025.5, 2020.5, float("NaN")])
def test_add_annual_report(
short_term_debt: float,
company_id: int,
year: int,
finance_statements: list[dict[str, Any]],
full_db: Session,
) -> None:
"""Tests the addition of annual financial records."""
data_transfer.add_annual_report(
company_id,
year,
{
"financials": {
"revenue": 123,
"ebitda": 235,
"short_term_debt": short_term_debt,
},
"auditors": {},
},
db=full_db,
)
full_db.commit()
df_prior = pd.read_sql_table(
entities.AnnualFinanceStatement.__tablename__, full_db.bind # type: ignore
)
expected_results = pd.DataFrame(
finance_statements
+ [
{
"id": 3,
"company_id": company_id,
"date": pd.to_datetime(date(year, 1, 1)),
"revenue": 123.0,
"ebitda": 235.0,
"short_term_debt": short_term_debt,
}
]
)
expected_results["date"] = pd.to_datetime(expected_results["date"])
pd.testing.assert_frame_equal(
expected_results,
df_prior,
check_like=True,
)
def test_add_annual_report_financial_key_error(full_db: Session) -> None:
"""Tests if an error is thrown financial data is tried to be added with an unknown financial record type."""
with pytest.raises(
TypeError, match="is an invalid keyword argument for AnnualFinanceStatement"
):
data_transfer.add_annual_report(
2,
2023,
{"financials": {"something-strange": 123.12}, "auditors": {}},
db=full_db,
)
def test_company_relation_missing(empty_db: Session) -> None:
"""Check if adding missing company to a query list works."""
data_transfer.company_relation_missing("Some_company", None, None, empty_db)
empty_db.commit()
data_transfer.company_relation_missing("Other_company", None, "some city", empty_db)
empty_db.commit()
data_transfer.company_relation_missing(
"Some_company",
**{"city": "some city", "zip_code": "12345", "street": "some-street"},
db=empty_db,
)
empty_db.commit()
pd.testing.assert_frame_equal(
pd.read_sql_table(
entities.MissingCompany.__tablename__, empty_db.bind # type: ignore
).set_index("name"),
pd.DataFrame(
[
{
"name": "Some_company",
"zip_code": "12345",
"city": "some city",
"number_of_links": 2,
"searched_for": False,
},
{
"name": "Other_company",
"zip_code": None,
"city": "some city",
"number_of_links": 1,
"searched_for": False,
},
]
).set_index("name"),
)
def test_company_relation_missing_reset(empty_db: Session) -> None:
"""Tests the reset of missing company relation counts."""
empty_db.add_all(
[
entities.MissingCompany(
name="Some Company",
city="city",
zip_code="12345",
number_of_links=5,
searched_for=True,
),
entities.MissingCompany(
name="Other Company",
city="city2",
zip_code="98765",
number_of_links=1,
searched_for=False,
),
]
)
empty_db.commit()
data_transfer.reset_relation_counter(empty_db)
queried_df = pd.read_sql_table(
entities.MissingCompany.__tablename__, empty_db.bind # type: ignore
).set_index("name")
pd.testing.assert_frame_equal(
queried_df,
pd.DataFrame(
[
{
"name": "Some Company",
"zip_code": "12345",
"city": "city",
"number_of_links": 0,
"searched_for": True,
},
{
"name": "Other Company",
"zip_code": "98765",
"city": "city2",
"number_of_links": 0,
"searched_for": False,
},
]
).set_index("name"),
)
@pytest.mark.parametrize("capital_type", [_.value for _ in CapitalTypeEnum])
@pytest.mark.parametrize("currency", ["", "EUR"])
def test_norm_capital_eur(currency: str, capital_type: str) -> None:
"""Tests if eur entries can be converted / normed correctly."""
assert data_transfer.norm_capital(
{"value": 5, "currency": currency, "type": capital_type}
) == {
"capital_value": 5.0,
"original_currency": CurrencyEnum("EUR"),
"capital_type": CapitalTypeEnum(capital_type),
}
@pytest.mark.parametrize("capital_type", list(CapitalTypeEnum))
@pytest.mark.parametrize("currency", ["DM", "DEM"])
def test_norm_capital_dm(currency: str, capital_type: CapitalTypeEnum) -> None:
"""Tests if dm entries can be converted / normed correctly."""
assert data_transfer.norm_capital(
capital={"value": 5, "currency": currency, "type": capital_type}
) == {
"capital_value": 2.56,
"original_currency": CurrencyEnum("DM"),
"capital_type": CapitalTypeEnum(capital_type),
}
def test_norm_capital_fail() -> None:
"""Tests if the entry is dropped if it isn't complete."""
assert data_transfer.norm_capital({"something": "something"}) == {} # type: ignore
@pytest.mark.parametrize(
("zip_code", "results"),
[
("44809", {"latitude": 51.4997, "longitude": 7.1944, "pos_accuracy": 4.0}),
(None, {}),
("", {}),
("60547", {}),
("58590", {}),
],
)
def test_get_geocodes(zip_code: str | None, results: dict) -> None:
assert data_transfer.get_geocodes(zip_code) == results
def test_transfer_data_cli(monkeypatch: MonkeyPatch) -> None:
monkeypatch.setattr(sys, "argv", [sys.argv[0]])
with pytest.raises(SystemExit):
data_transfer.transfer_data_cli()
def test_transfer_data_cli_help(monkeypatch: MonkeyPatch) -> None:
monkeypatch.setattr(sys, "argv", [sys.argv[0], "-h"])
with pytest.raises(SystemExit):
data_transfer.transfer_data_cli()
@pytest.mark.parametrize("upper", [True, False])
def test_transfer_data_cli_env(
monkeypatch: MonkeyPatch, upper: bool, mocker: MockerFixture
) -> None:
monkeypatch.setattr(sys, "argv", [sys.argv[0], "ENV" if upper else "env"])
mocker.patch(
"aki_prj23_transparenzregister.utils.data_transfer.transfer_data", lambda _: _
)
spy = mocker.spy(data_transfer, "transfer_data")
data_transfer.transfer_data_cli()
spy.assert_called_once()