test: Unit tests

This commit is contained in:
TrisNol 2023-11-04 10:32:35 +01:00
parent 7605858234
commit 61f94fa3b9
5 changed files with 277 additions and 315 deletions

View File

@ -1 +1,123 @@
"""Common functions for data transformation."""
import re
import typing
from collections.abc import Sequence
from aki_prj23_transparenzregister.models.company import (
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
Location,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.string_tools import (
transform_date_to_iso,
)
def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any:
"""Traverse a dict using list of keys.
Args:
data (dict): Data export
path (Sequence[str | int | object]): List of keys
Raises:
KeyError: If key not found
Returns:
any: Value at the end of the path
"""
current = data
for key in path:
try:
current = current[key]
except KeyError as e:
raise KeyError(f"Key {key} not found") from e
return current
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data

View File

@ -17,6 +17,11 @@ from aki_prj23_transparenzregister.models.company import (
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
extract_date_from_string,
map_co_relation,
normalize_street,
)
from aki_prj23_transparenzregister.utils.string_tools import (
remove_traling_and_leading_quotes,
transform_date_to_iso,
@ -149,24 +154,6 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
return None
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
@ -338,33 +325,6 @@ def map_business_purpose(data: dict) -> str | None:
return None
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
@ -457,48 +417,6 @@ def map_last_update(data: dict) -> str:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"]
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.

View File

@ -1,8 +1,6 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import re
import typing
from collections.abc import Sequence
from aki_prj23_transparenzregister.models.company import (
Capital,
@ -20,6 +18,11 @@ from aki_prj23_transparenzregister.models.company import (
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
map_co_relation,
normalize_street,
traversal,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import (
RoleMapper,
)
@ -193,24 +196,6 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
return None
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
@ -228,7 +213,6 @@ def loc_from_beteiligung(data: dict) -> Location:
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
# "tns:anschrift",
]
base = traversal(data, base_path)
base = base["tns:anschrift"] if "tns:anschrift" in base else base["tns:sitz"]
@ -419,33 +403,6 @@ def map_business_purpose(data: dict) -> str | None:
return None
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
@ -480,28 +437,6 @@ def map_founding_date(data: dict) -> str | None:
return None
def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any:
"""Traverse a dict using list of keys.
Args:
data (dict): Data export
path (Sequence[str | int | object]): List of keys
Raises:
KeyError: If key not found
Returns:
any: Value at the end of the path
"""
current = data
for key in path:
try:
current = current[key]
except KeyError as e:
raise KeyError(f"Key {key} not found") from e
return current
def map_hr_number(data: dict) -> str:
"""Extract the HR number from a given Unternehmensregister export.
@ -585,48 +520,7 @@ def map_last_update(data: dict) -> str:
return traversal(data, path)
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data
# TODO class model with inheritance - only difference: Determine root in __init__
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
@ -651,7 +545,6 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
# TODO adapt...
for i in range(
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
):

View File

@ -0,0 +1,144 @@
"""Testing data_extraction/unternehmensregister/transform/common.py."""
import pytest
from aki_prj23_transparenzregister.models.company import (
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
Location,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
common,
)
def test_import_common() -> None:
assert common
def test_traversal() -> None:
data = {"a": {"b": {"c": "d"}}}
assert common.traversal(data, ["a", "b", "c"]) == "d"
# def test_traversal_raises_key_error():
# data = {"a": {"b": {"c": "d"}}}
# try:
# common.traversal(data, ["a", "b", "d"])
# except KeyError:
# assert True
# else:
# assert False
@pytest.mark.parametrize(
("value", "expected_result"),
[
(None, None),
("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"),
("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"),
("Str. des Tests", "Straße des Tests"),
],
)
def test_normalize_street(value: str, expected_result: str) -> None:
result = common.normalize_street(value)
assert result == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
("", None),
("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"),
("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"),
("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"),
("Str. des Tests vom 1999-04-05", "1999-04-05"),
("Once upon a midnight dreary while I pondered weak and weary...", None),
(
"This company was first founded in 2016-06-10 and then again on 1.5.2004",
None,
),
],
)
def test_extract_date_from_string(value: str, expected_result: str) -> None:
result = common.extract_date_from_string(value)
assert result == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
(
{
"location": Location(
"", "c/o Youco24 Business Center, Abc ffda", None, None
),
"relationships": [],
},
{
"location": Location("", "Abc ffda", None, None),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("", "Abc ffda", None, None),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [],
},
),
],
)
def test_map_co_relation(value: dict, expected_result: dict) -> None:
result = common.map_co_relation(value)
assert result == expected_result

View File

@ -1,8 +1,6 @@
"""Testing utils/data_extraction/unternehmensregister/transform.py."""
from unittest.mock import Mock, patch
import pytest
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
@ -266,20 +264,6 @@ def test_loc_from_beteiligung_combine() -> None:
assert transform.loc_from_beteiligung(data) == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
(None, None),
("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"),
("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"),
("Str. des Tests", "Straße des Tests"),
],
)
def test_normalize_street(value: str, expected_result: str) -> None:
result = transform.normalize_street(value)
assert result == expected_result
def test_name_from_beteiligung() -> None:
data = {
"XJustiz_Daten": {
@ -582,28 +566,6 @@ def test_map_business_purpose_no_result() -> None:
assert result is None
@pytest.mark.parametrize(
("value", "expected_result"),
[
("", None),
("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"),
("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"),
("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"),
("Str. des Tests vom 1999-04-05", "1999-04-05"),
("Once upon a midnight dreary while I pondered weak and weary...", None),
(
"This company was first founded in 2016-06-10 and then again on 1.5.2004",
None,
),
],
)
def test_extract_date_from_string(value: str, expected_result: str) -> None:
result = transform.extract_date_from_string(value)
assert result == expected_result
def test_map_founding_date_from_tag_der_ersten_eintragung() -> None:
data = {
"some entry": "Tag der ersten Eintragung: 01.05.2004",
@ -690,83 +652,6 @@ def test_map_last_update() -> None:
assert result == date
@pytest.mark.parametrize(
("value", "expected_result"),
[
(
{
"location": Location(
"", "c/o Youco24 Business Center, Abc ffda", None, None
),
"relationships": [],
},
{
"location": Location("", "Abc ffda", None, None),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("", "Abc ffda", None, None),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [],
},
),
],
)
def test_map_co_relation(value: dict, expected_result: dict) -> None:
result = transform.map_co_relation(value)
assert result == expected_result
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_co_relation"
)