diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py index 8129c28..610433d 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py @@ -159,6 +159,24 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None: return None +def normalize_street(street: str) -> str: + """Normalize street names by extending them to `Straße` or `straße`. + + Args: + street (str): Name of street + + Returns: + str: Normalized street name + """ + if street is None: + return None + regex = r"(Str\.|Strasse)" + street = re.sub(regex, "Straße", street) + regex = r"(str\.|strasse)" + street = re.sub(regex, "straße", street) + return street.strip() + + def loc_from_beteiligung(data: dict) -> Location: """Extract the company location from the first relationship in the export. @@ -168,30 +186,30 @@ def loc_from_beteiligung(data: dict) -> Location: Returns: Location: location """ + base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ + "Beteiligter" + ]["Organisation"]["Anschrift"] + + house_number = None + street = None + if "Strasse" in base: + regex = r".(\d+)$" + hits = re.findall(regex, base["Strasse"]) + if len(hits) == 1: + house_number = hits[0] + street = base["Strasse"][: (-1 * len(house_number))] + if "Hausnummer" in base: + house_number = house_number + base["Hausnummer"] + else: + if "Hausnummer" in base: + house_number = base["Hausnummer"] + street = base["Strasse"] return Location( **{ - "city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][0]["Beteiligter"]["Organisation"]["Anschrift"]["Ort"], - "zip_code": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][0]["Beteiligter"]["Organisation"]["Anschrift"]["Postleitzahl"], - "street": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][0]["Beteiligter"]["Organisation"]["Anschrift"]["Strasse"] - if "Strasse" - in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ - "Beteiligter" - ]["Organisation"]["Anschrift"] - else None, - "house_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][0]["Beteiligter"]["Organisation"]["Anschrift"]["Hausnummer"] - if "Hausnummer" - in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ - "Beteiligter" - ]["Organisation"]["Anschrift"] - else None, + "city": base["Ort"], + "zip_code": base["Postleitzahl"], + "street": normalize_street(street), # type: ignore + "house_number": house_number, } ) diff --git a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py index 03354f4..c328c27 100644 --- a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py +++ b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py @@ -4,7 +4,7 @@ from threading import Lock from bson.objectid import ObjectId from pymongo.results import InsertOneResult, UpdateResult -from aki_prj23_transparenzregister.models.company import Company +from aki_prj23_transparenzregister.models.company import Company, CompanyID from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector @@ -30,7 +30,7 @@ class CompanyMongoService: result = self.collection.find() return list(result) - def get_by_id(self, id: dict) -> dict | None: + def get_by_id(self, id: dict | CompanyID) -> dict | None: """Get a Company document by the given id. Args: @@ -39,22 +39,15 @@ class CompanyMongoService: Returns: dict | None: Company if found """ + if not isinstance(id, dict): + id = id.to_dict() + query = { + "id.hr_number": id["hr_number"], + "id.district_court.name": id["district_court"]["name"], + "id.district_court.city": id["district_court"]["city"], + } with self.lock: - result = list( - self.collection.find( - { - "id": { - "$eq": { - "hr_number": id["hr_number"], - "district_court": { - "name": id["district_court"]["name"], - "city": id["district_court"]["city"], - }, - } - } - } - ) - ) + result = list(self.collection.find(query)) if len(result) == 1: return result[0] return None @@ -130,7 +123,7 @@ class CompanyMongoService: Returns: InsertOneResult | UpdateResult: Result depending on action """ - entry = self.get_by_id(data.id.to_dict()) + entry = self.get_by_id(data.id) if entry is None: return self.insert(data) statement = {"$set": dict(data.to_dict().items())} diff --git a/tests/utils/data_extraction/unternehmensregister/transform_test.py b/tests/utils/data_extraction/unternehmensregister/transform_test.py index e1a68ec..b690765 100644 --- a/tests/utils/data_extraction/unternehmensregister/transform_test.py +++ b/tests/utils/data_extraction/unternehmensregister/transform_test.py @@ -4,6 +4,8 @@ import os from tempfile import TemporaryDirectory from unittest.mock import Mock, patch +import pytest + from aki_prj23_transparenzregister.models.company import ( Capital, CapitalTypeEnum, @@ -160,6 +162,122 @@ def test_loc_from_beteiligung() -> None: assert transform.loc_from_beteiligung(data) == expected_result +def test_loc_from_beteiligung_number_contained_in_street() -> None: + data = { + "XJustiz_Daten": { + "Grunddaten": { + "Verfahrensdaten": { + "Beteiligung": [ + { + "Beteiligter": { + "Beteiligtennummer": "1", + "Organisation": { + "Bezeichnung": { + "Bezeichnung_Aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG" + }, + "Anschrift": { + "Strasse": "Gewerbestraße8", + "Postleitzahl": "72535", + "Ort": "Heroldstatt", + }, + }, + } + }, + ] + } + } + } + } + + expected_result = Location( + city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535" + ) + assert transform.loc_from_beteiligung(data) == expected_result + + +def test_loc_from_beteiligung_no_result() -> None: + data = { + "XJustiz_Daten": { + "Grunddaten": { + "Verfahrensdaten": { + "Beteiligung": [ + { + "Beteiligter": { + "Beteiligtennummer": "1", + "Organisation": { + "Bezeichnung": { + "Bezeichnung_Aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG" + }, + "Anschrift": { + "Postleitzahl": "72535", + "Ort": "Heroldstatt", + }, + }, + } + }, + ] + } + } + } + } + + expected_result = Location( + city="Heroldstatt", house_number=None, street=None, zip_code="72535" + ) + assert transform.loc_from_beteiligung(data) == expected_result + + +def test_loc_from_beteiligung_combine() -> None: + data = { + "XJustiz_Daten": { + "Grunddaten": { + "Verfahrensdaten": { + "Beteiligung": [ + { + "Beteiligter": { + "Beteiligtennummer": "1", + "Organisation": { + "Bezeichnung": { + "Bezeichnung_Aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG" + }, + "Anschrift": { + "Postleitzahl": "72535", + "Strasse": "Pliangenserstr. 40", + "Hausnummer": "a", + "Ort": "Heroldstatt", + }, + }, + } + }, + ] + } + } + } + } + + expected_result = Location( + city="Heroldstatt", + house_number="40a", + street="Pliangenserstraße", + zip_code="72535", + ) + assert transform.loc_from_beteiligung(data) == expected_result + + +@pytest.mark.parametrize( + ("value", "expected_result"), + [ + (None, None), + ("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"), + ("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"), + ("Str. des Tests", "Straße des Tests"), + ], +) +def test_normalize_street(value: str, expected_result: str) -> None: + result = transform.normalize_street(value) + assert result == expected_result + + def test_name_from_beteiligung() -> None: data = { "XJustiz_Daten": {