diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/common.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/common.py index 25f54b1..6d30ad6 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/common.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/common.py @@ -1 +1,123 @@ """Common functions for data transformation.""" +import re +import typing +from collections.abc import Sequence + +from aki_prj23_transparenzregister.models.company import ( + CompanyRelationshipEnum, + CompanyToCompanyRelationship, + Location, + RelationshipRoleEnum, +) +from aki_prj23_transparenzregister.utils.string_tools import ( + transform_date_to_iso, +) + + +def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any: + """Traverse a dict using list of keys. + + Args: + data (dict): Data export + path (Sequence[str | int | object]): List of keys + + Raises: + KeyError: If key not found + + Returns: + any: Value at the end of the path + """ + current = data + for key in path: + try: + current = current[key] + except KeyError as e: + raise KeyError(f"Key {key} not found") from e + return current + + +def normalize_street(street: str) -> str: + """Normalize street names by extending them to `Straße` or `straße`. + + Args: + street (str): Name of street + + Returns: + str: Normalized street name + """ + if street is None: + return None + regex = r"(Str\.|Strasse)" + street = re.sub(regex, "Straße", street) + regex = r"(str\.|strasse)" + street = re.sub(regex, "straße", street) + return street.strip() + + +def extract_date_from_string(value: str) -> str | None: + """Extract a date in ISO format from the given string if possible. + + Args: + value (str): Input text + + Returns: + str | None: Date in ISO format, None if not found + """ + date_regex = [ # type: ignore + {"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso}, + {"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None}, + ] + results = [] + for regex in date_regex: + result = re.findall(regex["regex"], value) # type: ignore + if len(result) == 1: + relevant_data = result[0] + if regex["mapper"] is not None: # type: ignore + results.append(regex["mapper"](relevant_data)) # type: ignore + else: + results.append(relevant_data) + if len(results) != 1: + return None + return results[0] + + +def map_co_relation(data: dict) -> dict: + """Search for and map the c/o relation from location.street if possible. + + Args: + data (dict): Company dict + + Returns: + dict: Modified Company dict + """ + street = data["location"].street + if street is None: + return data + parts = street.split(",") + co_company = None + co_company_index = None + for index, part in enumerate(parts): + trimmed_part = part.strip() + result = re.findall(r"^c\/o(.*)$", trimmed_part) + if len(result) == 1: + co_company = result[0].strip() + co_company_index = index + if co_company_index is not None: + del parts[co_company_index] + street = "".join(parts).strip() + data["location"].street = street + + if co_company is not None and co_company != "": + relation = CompanyToCompanyRelationship( + RelationshipRoleEnum.CARE_OF, # type: ignore + Location( + data["location"].city, + street, + data["location"].house_number, + data["location"].zip_code, + ), + CompanyRelationshipEnum.COMPANY, # type: ignore + co_company, + ) + data["relationships"].append(relation) + return data diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py index d9e8868..834b1e5 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py @@ -17,6 +17,11 @@ from aki_prj23_transparenzregister.models.company import ( PersonToCompanyRelationship, RelationshipRoleEnum, ) +from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import ( + extract_date_from_string, + map_co_relation, + normalize_street, +) from aki_prj23_transparenzregister.utils.string_tools import ( remove_traling_and_leading_quotes, transform_date_to_iso, @@ -149,24 +154,6 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None: return None -def normalize_street(street: str) -> str: - """Normalize street names by extending them to `Straße` or `straße`. - - Args: - street (str): Name of street - - Returns: - str: Normalized street name - """ - if street is None: - return None - regex = r"(Str\.|Strasse)" - street = re.sub(regex, "Straße", street) - regex = r"(str\.|strasse)" - street = re.sub(regex, "straße", street) - return street.strip() - - def loc_from_beteiligung(data: dict) -> Location: """Extract the company location from the first relationship in the export. @@ -338,33 +325,6 @@ def map_business_purpose(data: dict) -> str | None: return None -def extract_date_from_string(value: str) -> str | None: - """Extract a date in ISO format from the given string if possible. - - Args: - value (str): Input text - - Returns: - str | None: Date in ISO format, None if not found - """ - date_regex = [ # type: ignore - {"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso}, - {"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None}, - ] - results = [] - for regex in date_regex: - result = re.findall(regex["regex"], value) # type: ignore - if len(result) == 1: - relevant_data = result[0] - if regex["mapper"] is not None: # type: ignore - results.append(regex["mapper"](relevant_data)) # type: ignore - else: - results.append(relevant_data) - if len(results) != 1: - return None - return results[0] - - def map_founding_date(data: dict) -> str | None: """Extracts the founding date from a given Unternehmensregister export. @@ -457,48 +417,6 @@ def map_last_update(data: dict) -> str: return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"] -def map_co_relation(data: dict) -> dict: - """Search for and map the c/o relation from location.street if possible. - - Args: - data (dict): Company dict - - Returns: - dict: Modified Company dict - """ - street = data["location"].street - if street is None: - return data - parts = street.split(",") - co_company = None - co_company_index = None - for index, part in enumerate(parts): - trimmed_part = part.strip() - result = re.findall(r"^c\/o(.*)$", trimmed_part) - if len(result) == 1: - co_company = result[0].strip() - co_company_index = index - if co_company_index is not None: - del parts[co_company_index] - street = "".join(parts).strip() - data["location"].street = street - - if co_company is not None and co_company != "": - relation = CompanyToCompanyRelationship( - RelationshipRoleEnum.CARE_OF, # type: ignore - Location( - data["location"].city, - street, - data["location"].house_number, - data["location"].zip_code, - ), - CompanyRelationshipEnum.COMPANY, # type: ignore - co_company, - ) - data["relationships"].append(relation) - return data - - def map_unternehmensregister_json(data: dict) -> Company: """Processes the Unternehmensregister structured export to a Company by using several helper methods. diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py index cdc9981..240231a 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py @@ -1,8 +1,6 @@ """Transform raw Unternehmensregister export (*.xml) to processed .json files for loading.""" import re -import typing -from collections.abc import Sequence from aki_prj23_transparenzregister.models.company import ( Capital, @@ -20,6 +18,11 @@ from aki_prj23_transparenzregister.models.company import ( PersonToCompanyRelationship, RelationshipRoleEnum, ) +from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import ( + map_co_relation, + normalize_street, + traversal, +) from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import ( RoleMapper, ) @@ -193,24 +196,6 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None: return None -def normalize_street(street: str) -> str: - """Normalize street names by extending them to `Straße` or `straße`. - - Args: - street (str): Name of street - - Returns: - str: Normalized street name - """ - if street is None: - return None - regex = r"(Str\.|Strasse)" - street = re.sub(regex, "Straße", street) - regex = r"(str\.|strasse)" - street = re.sub(regex, "straße", street) - return street.strip() - - def loc_from_beteiligung(data: dict) -> Location: """Extract the company location from the first relationship in the export. @@ -228,7 +213,6 @@ def loc_from_beteiligung(data: dict) -> Location: "tns:beteiligter", "tns:auswahl_beteiligter", "tns:organisation", - # "tns:anschrift", ] base = traversal(data, base_path) base = base["tns:anschrift"] if "tns:anschrift" in base else base["tns:sitz"] @@ -419,33 +403,6 @@ def map_business_purpose(data: dict) -> str | None: return None -def extract_date_from_string(value: str) -> str | None: - """Extract a date in ISO format from the given string if possible. - - Args: - value (str): Input text - - Returns: - str | None: Date in ISO format, None if not found - """ - date_regex = [ # type: ignore - {"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso}, - {"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None}, - ] - results = [] - for regex in date_regex: - result = re.findall(regex["regex"], value) # type: ignore - if len(result) == 1: - relevant_data = result[0] - if regex["mapper"] is not None: # type: ignore - results.append(regex["mapper"](relevant_data)) # type: ignore - else: - results.append(relevant_data) - if len(results) != 1: - return None - return results[0] - - def map_founding_date(data: dict) -> str | None: """Extracts the founding date from a given Unternehmensregister export. @@ -480,28 +437,6 @@ def map_founding_date(data: dict) -> str | None: return None -def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any: - """Traverse a dict using list of keys. - - Args: - data (dict): Data export - path (Sequence[str | int | object]): List of keys - - Raises: - KeyError: If key not found - - Returns: - any: Value at the end of the path - """ - current = data - for key in path: - try: - current = current[key] - except KeyError as e: - raise KeyError(f"Key {key} not found") from e - return current - - def map_hr_number(data: dict) -> str: """Extract the HR number from a given Unternehmensregister export. @@ -585,48 +520,7 @@ def map_last_update(data: dict) -> str: return traversal(data, path) -def map_co_relation(data: dict) -> dict: - """Search for and map the c/o relation from location.street if possible. - - Args: - data (dict): Company dict - - Returns: - dict: Modified Company dict - """ - street = data["location"].street - if street is None: - return data - parts = street.split(",") - co_company = None - co_company_index = None - for index, part in enumerate(parts): - trimmed_part = part.strip() - result = re.findall(r"^c\/o(.*)$", trimmed_part) - if len(result) == 1: - co_company = result[0].strip() - co_company_index = index - if co_company_index is not None: - del parts[co_company_index] - street = "".join(parts).strip() - data["location"].street = street - - if co_company is not None and co_company != "": - relation = CompanyToCompanyRelationship( - RelationshipRoleEnum.CARE_OF, # type: ignore - Location( - data["location"].city, - street, - data["location"].house_number, - data["location"].zip_code, - ), - CompanyRelationshipEnum.COMPANY, # type: ignore - co_company, - ) - data["relationships"].append(relation) - return data - - +# TODO class model with inheritance - only difference: Determine root in __init__ def map_unternehmensregister_json(data: dict) -> Company: """Processes the Unternehmensregister structured export to a Company by using several helper methods. @@ -651,7 +545,6 @@ def map_unternehmensregister_json(data: dict) -> Company: result["business_purpose"] = map_business_purpose(data) result["founding_date"] = map_founding_date(data) - # TODO adapt... for i in range( 2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"]) ): diff --git a/tests/utils/data_extraction/unternehmensregister/transform/common_test.py b/tests/utils/data_extraction/unternehmensregister/transform/common_test.py new file mode 100644 index 0000000..8a4c5b5 --- /dev/null +++ b/tests/utils/data_extraction/unternehmensregister/transform/common_test.py @@ -0,0 +1,144 @@ +"""Testing data_extraction/unternehmensregister/transform/common.py.""" +import pytest + +from aki_prj23_transparenzregister.models.company import ( + CompanyRelationshipEnum, + CompanyToCompanyRelationship, + Location, + RelationshipRoleEnum, +) +from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import ( + common, +) + + +def test_import_common() -> None: + assert common + + +def test_traversal() -> None: + data = {"a": {"b": {"c": "d"}}} + assert common.traversal(data, ["a", "b", "c"]) == "d" + + +# def test_traversal_raises_key_error(): +# data = {"a": {"b": {"c": "d"}}} +# try: +# common.traversal(data, ["a", "b", "d"]) +# except KeyError: +# assert True +# else: +# assert False + + +@pytest.mark.parametrize( + ("value", "expected_result"), + [ + (None, None), + ("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"), + ("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"), + ("Str. des Tests", "Straße des Tests"), + ], +) +def test_normalize_street(value: str, expected_result: str) -> None: + result = common.normalize_street(value) + assert result == expected_result + + +@pytest.mark.parametrize( + ("value", "expected_result"), + [ + ("", None), + ("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"), + ("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"), + ("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"), + ("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"), + ("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"), + ("Str. des Tests vom 1999-04-05", "1999-04-05"), + ("Once upon a midnight dreary while I pondered weak and weary...", None), + ( + "This company was first founded in 2016-06-10 and then again on 1.5.2004", + None, + ), + ], +) +def test_extract_date_from_string(value: str, expected_result: str) -> None: + result = common.extract_date_from_string(value) + assert result == expected_result + + +@pytest.mark.parametrize( + ("value", "expected_result"), + [ + ( + { + "location": Location( + "", "c/o Youco24 Business Center, Abc ffda", None, None + ), + "relationships": [], + }, + { + "location": Location("", "Abc ffda", None, None), + "relationships": [ + CompanyToCompanyRelationship( + RelationshipRoleEnum.CARE_OF, # type: ignore + Location("", "Abc ffda", None, None), + CompanyRelationshipEnum.COMPANY, + "Youco24 Business Center", + ) + ], + }, + ), + ( + { + "location": Location( + "Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644" + ), + "relationships": [], + }, + { + "location": Location("Iserlohn", "Abc Str.", "42", "58644"), + "relationships": [ + CompanyToCompanyRelationship( + RelationshipRoleEnum.CARE_OF, # type: ignore + Location("Iserlohn", "Abc Str.", "42", "58644"), + CompanyRelationshipEnum.COMPANY, + "Youco24 Business Center", + ) + ], + }, + ), + ( + { + "location": Location( + "Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644" + ), + "relationships": [], + }, + { + "location": Location("Iserlohn", "Abc Str.", "42", "58644"), + "relationships": [ + CompanyToCompanyRelationship( + RelationshipRoleEnum.CARE_OF, # type: ignore + Location("Iserlohn", "Abc Str.", "42", "58644"), + CompanyRelationshipEnum.COMPANY, + "Youco24 Business Center", + ) + ], + }, + ), + ( + { + "location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"), + "relationships": [], + }, + { + "location": Location("Iserlohn", "Abc Str.", "42", "58644"), + "relationships": [], + }, + ), + ], +) +def test_map_co_relation(value: dict, expected_result: dict) -> None: + result = common.map_co_relation(value) + assert result == expected_result diff --git a/tests/utils/data_extraction/unternehmensregister/transform/v1_test.py b/tests/utils/data_extraction/unternehmensregister/transform/v1_test.py index 47c525c..34b8ead 100644 --- a/tests/utils/data_extraction/unternehmensregister/transform/v1_test.py +++ b/tests/utils/data_extraction/unternehmensregister/transform/v1_test.py @@ -1,8 +1,6 @@ """Testing utils/data_extraction/unternehmensregister/transform.py.""" from unittest.mock import Mock, patch -import pytest - from aki_prj23_transparenzregister.models.company import ( Capital, CapitalTypeEnum, @@ -266,20 +264,6 @@ def test_loc_from_beteiligung_combine() -> None: assert transform.loc_from_beteiligung(data) == expected_result -@pytest.mark.parametrize( - ("value", "expected_result"), - [ - (None, None), - ("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"), - ("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"), - ("Str. des Tests", "Straße des Tests"), - ], -) -def test_normalize_street(value: str, expected_result: str) -> None: - result = transform.normalize_street(value) - assert result == expected_result - - def test_name_from_beteiligung() -> None: data = { "XJustiz_Daten": { @@ -582,28 +566,6 @@ def test_map_business_purpose_no_result() -> None: assert result is None -@pytest.mark.parametrize( - ("value", "expected_result"), - [ - ("", None), - ("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"), - ("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"), - ("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"), - ("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"), - ("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"), - ("Str. des Tests vom 1999-04-05", "1999-04-05"), - ("Once upon a midnight dreary while I pondered weak and weary...", None), - ( - "This company was first founded in 2016-06-10 and then again on 1.5.2004", - None, - ), - ], -) -def test_extract_date_from_string(value: str, expected_result: str) -> None: - result = transform.extract_date_from_string(value) - assert result == expected_result - - def test_map_founding_date_from_tag_der_ersten_eintragung() -> None: data = { "some entry": "Tag der ersten Eintragung: 01.05.2004", @@ -690,83 +652,6 @@ def test_map_last_update() -> None: assert result == date -@pytest.mark.parametrize( - ("value", "expected_result"), - [ - ( - { - "location": Location( - "", "c/o Youco24 Business Center, Abc ffda", None, None - ), - "relationships": [], - }, - { - "location": Location("", "Abc ffda", None, None), - "relationships": [ - CompanyToCompanyRelationship( - RelationshipRoleEnum.CARE_OF, # type: ignore - Location("", "Abc ffda", None, None), - CompanyRelationshipEnum.COMPANY, - "Youco24 Business Center", - ) - ], - }, - ), - ( - { - "location": Location( - "Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644" - ), - "relationships": [], - }, - { - "location": Location("Iserlohn", "Abc Str.", "42", "58644"), - "relationships": [ - CompanyToCompanyRelationship( - RelationshipRoleEnum.CARE_OF, # type: ignore - Location("Iserlohn", "Abc Str.", "42", "58644"), - CompanyRelationshipEnum.COMPANY, - "Youco24 Business Center", - ) - ], - }, - ), - ( - { - "location": Location( - "Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644" - ), - "relationships": [], - }, - { - "location": Location("Iserlohn", "Abc Str.", "42", "58644"), - "relationships": [ - CompanyToCompanyRelationship( - RelationshipRoleEnum.CARE_OF, # type: ignore - Location("Iserlohn", "Abc Str.", "42", "58644"), - CompanyRelationshipEnum.COMPANY, - "Youco24 Business Center", - ) - ], - }, - ), - ( - { - "location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"), - "relationships": [], - }, - { - "location": Location("Iserlohn", "Abc Str.", "42", "58644"), - "relationships": [], - }, - ), - ], -) -def test_map_co_relation(value: dict, expected_result: dict) -> None: - result = transform.map_co_relation(value) - assert result == expected_result - - @patch( "aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_co_relation" )