mirror of
https://github.com/fhswf/aki_prj23_transparenzregister.git
synced 2025-06-22 07:33:56 +02:00
test: Increase test coverage and refactor v3
This commit is contained in:
@ -1,11 +1,17 @@
|
|||||||
"""Common functions for data transformation."""
|
"""Common functions for data transformation."""
|
||||||
|
import abc
|
||||||
import re
|
import re
|
||||||
import typing
|
import typing
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.models.company import (
|
from aki_prj23_transparenzregister.models.company import (
|
||||||
|
Capital,
|
||||||
|
Company,
|
||||||
|
CompanyID,
|
||||||
|
CompanyRelationship,
|
||||||
CompanyRelationshipEnum,
|
CompanyRelationshipEnum,
|
||||||
CompanyToCompanyRelationship,
|
CompanyToCompanyRelationship,
|
||||||
|
CompanyTypeEnum,
|
||||||
Location,
|
Location,
|
||||||
RelationshipRoleEnum,
|
RelationshipRoleEnum,
|
||||||
)
|
)
|
||||||
@ -121,3 +127,130 @@ def map_co_relation(data: dict) -> dict:
|
|||||||
)
|
)
|
||||||
data["relationships"].append(relation)
|
data["relationships"].append(relation)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class BaseTransformer(metaclass=abc.ABCMeta):
|
||||||
|
"""Generic abstract class for data transformation between Unternehmensregister and Transparenzregister API."""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def parse_date_of_birth(self, data: dict) -> str | None:
|
||||||
|
"""Retreives the date of birth from a stakeholder entry if possible.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Stakeholder data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str | None: date of birth or None if not found
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
|
||||||
|
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompanyRelationship | None: Relationship if it could be processed
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def loc_from_beteiligung(self, data: dict) -> Location:
|
||||||
|
"""Extract the company location from the first relationship in the export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Location: location
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def name_from_beteiligung(self, data: dict) -> str:
|
||||||
|
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Company name
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
|
||||||
|
"""Extracts the company type from a given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
company_name (str): Name of the company as a fallback solution
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompanyTypeEnum | None: Company type if found
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_capital(self, data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
||||||
|
"""Extracts the company capital from the given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Capital | None: Company Capital if found
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_business_purpose(self, data: dict) -> str | None:
|
||||||
|
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str | None: Business purpose if found
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_founding_date(self, data: dict) -> str | None:
|
||||||
|
"""Extracts the founding date from a given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str | None: Founding date if found
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_company_id(self, data: dict) -> CompanyID:
|
||||||
|
"""Retrieve Company ID from export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompanyID: ID of the company
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_last_update(self, data: dict) -> str:
|
||||||
|
"""Extract last update date from export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Unternehmensregister export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Last update date
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map_unternehmensregister_json(self, data: dict) -> Company:
|
||||||
|
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Company: Transformed data
|
||||||
|
"""
|
||||||
|
@ -4,13 +4,15 @@ import glob
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import typing
|
|
||||||
|
|
||||||
import xmltodict
|
import xmltodict
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.models.company import Company
|
from aki_prj23_transparenzregister.models.company import Company
|
||||||
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
|
||||||
|
BaseTransformer,
|
||||||
|
)
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import (
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import (
|
||||||
v1,
|
v1,
|
||||||
)
|
)
|
||||||
@ -42,7 +44,7 @@ def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
|
|||||||
logger.error(e)
|
logger.error(e)
|
||||||
|
|
||||||
|
|
||||||
def determine_version(data: dict) -> typing.Any:
|
def determine_version(data: dict) -> BaseTransformer:
|
||||||
"""Determine Unternehmensregister data API version of given entry.
|
"""Determine Unternehmensregister data API version of given entry.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -56,9 +58,9 @@ def determine_version(data: dict) -> typing.Any:
|
|||||||
"""
|
"""
|
||||||
if "XJustiz_Daten" in data:
|
if "XJustiz_Daten" in data:
|
||||||
# TODO consider class inheritance for version modules
|
# TODO consider class inheritance for version modules
|
||||||
return v1
|
return v1.V1_Transformer()
|
||||||
if "tns:nachrichtenkopf" in data[list(data.keys())[0]]:
|
if "tns:nachrichtenkopf" in data[list(data.keys())[0]]:
|
||||||
return v3
|
return v3.V3_Transformer()
|
||||||
raise ValueError("Could not determine Unternehmensregister version.")
|
raise ValueError("Could not determine Unternehmensregister version.")
|
||||||
|
|
||||||
|
|
||||||
@ -77,6 +79,7 @@ def map_unternehmensregister_json(data: dict) -> Company:
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
|
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
|
||||||
|
# TODO Adapt to new structure with different versions
|
||||||
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
|
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
|
||||||
path = os.path.join(f"{base_path}/export", file)
|
path = os.path.join(f"{base_path}/export", file)
|
||||||
with open(path, encoding="utf-8") as file_object:
|
with open(path, encoding="utf-8") as file_object:
|
||||||
|
@ -18,6 +18,7 @@ from aki_prj23_transparenzregister.models.company import (
|
|||||||
RelationshipRoleEnum,
|
RelationshipRoleEnum,
|
||||||
)
|
)
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
|
||||||
|
BaseTransformer,
|
||||||
extract_date_from_string,
|
extract_date_from_string,
|
||||||
map_co_relation,
|
map_co_relation,
|
||||||
normalize_street,
|
normalize_street,
|
||||||
@ -28,7 +29,10 @@ from aki_prj23_transparenzregister.utils.string_tools import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_date_of_birth(data: dict) -> str | None:
|
class V1_Transformer(BaseTransformer): # noqa: N801
|
||||||
|
"""Transformer for data exports from Unternehmensregister (v1)."""
|
||||||
|
|
||||||
|
def parse_date_of_birth(self, data: dict) -> str | None:
|
||||||
"""Retreives the date of birth from a stakeholder entry if possible.
|
"""Retreives the date of birth from a stakeholder entry if possible.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -43,8 +47,7 @@ def parse_date_of_birth(data: dict) -> str | None:
|
|||||||
return base
|
return base
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
|
||||||
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|
||||||
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -55,7 +58,10 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
"""
|
"""
|
||||||
if "Natuerliche_Person" in data["Beteiligter"]:
|
if "Natuerliche_Person" in data["Beteiligter"]:
|
||||||
# It's a Company serving as a "Kommanditist" or similar
|
# It's a Company serving as a "Kommanditist" or similar
|
||||||
if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
|
if (
|
||||||
|
data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"]
|
||||||
|
is None
|
||||||
|
):
|
||||||
return CompanyToCompanyRelationship(
|
return CompanyToCompanyRelationship(
|
||||||
**{ # type: ignore
|
**{ # type: ignore
|
||||||
"name": remove_traling_and_leading_quotes(
|
"name": remove_traling_and_leading_quotes(
|
||||||
@ -69,12 +75,14 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
"Anschrift"
|
"Anschrift"
|
||||||
][-1]["Ort"]
|
][-1]["Ort"]
|
||||||
if isinstance(
|
if isinstance(
|
||||||
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
|
data["Beteiligter"]["Natuerliche_Person"][
|
||||||
|
"Anschrift"
|
||||||
|
],
|
||||||
list,
|
list,
|
||||||
)
|
)
|
||||||
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
|
else data["Beteiligter"]["Natuerliche_Person"][
|
||||||
"Ort"
|
"Anschrift"
|
||||||
]
|
]["Ort"]
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
"role": RelationshipRoleEnum(
|
"role": RelationshipRoleEnum(
|
||||||
@ -95,14 +103,15 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
]["Nachname"],
|
]["Nachname"],
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
"date_of_birth": parse_date_of_birth(data),
|
"date_of_birth": self.parse_date_of_birth(data),
|
||||||
"location": Location(
|
"location": Location(
|
||||||
**{
|
**{
|
||||||
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
|
"city": data["Beteiligter"]["Natuerliche_Person"][
|
||||||
-1
|
"Anschrift"
|
||||||
]["Ort"]
|
][-1]["Ort"]
|
||||||
if isinstance(
|
if isinstance(
|
||||||
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
|
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
|
||||||
|
list,
|
||||||
)
|
)
|
||||||
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
|
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
|
||||||
"Ort"
|
"Ort"
|
||||||
@ -128,11 +137,14 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
),
|
),
|
||||||
"location": Location(
|
"location": Location(
|
||||||
**{
|
**{
|
||||||
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
|
"city": data["Beteiligter"]["Organisation"]["Anschrift"][
|
||||||
|
"Ort"
|
||||||
|
],
|
||||||
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
|
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
|
||||||
"Strasse"
|
"Strasse"
|
||||||
]
|
]
|
||||||
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
|
if "Strasse"
|
||||||
|
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
||||||
else None,
|
else None,
|
||||||
"house_number": data["Beteiligter"]["Organisation"][
|
"house_number": data["Beteiligter"]["Organisation"][
|
||||||
"Anschrift"
|
"Anschrift"
|
||||||
@ -140,9 +152,9 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
if "Hausnummer"
|
if "Hausnummer"
|
||||||
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
||||||
else None,
|
else None,
|
||||||
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
|
"zip_code": data["Beteiligter"]["Organisation"][
|
||||||
"Postleitzahl"
|
"Anschrift"
|
||||||
]
|
]["Postleitzahl"]
|
||||||
if "Postleitzahl"
|
if "Postleitzahl"
|
||||||
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
||||||
else None,
|
else None,
|
||||||
@ -153,8 +165,7 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def loc_from_beteiligung(self, data: dict) -> Location:
|
||||||
def loc_from_beteiligung(data: dict) -> Location:
|
|
||||||
"""Extract the company location from the first relationship in the export.
|
"""Extract the company location from the first relationship in the export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -190,8 +201,7 @@ def loc_from_beteiligung(data: dict) -> Location:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def name_from_beteiligung(self, data: dict) -> str:
|
||||||
def name_from_beteiligung(data: dict) -> str:
|
|
||||||
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -205,8 +215,7 @@ def name_from_beteiligung(data: dict) -> str:
|
|||||||
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
|
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
|
||||||
return remove_traling_and_leading_quotes(name)
|
return remove_traling_and_leading_quotes(name)
|
||||||
|
|
||||||
|
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
|
||||||
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|
||||||
"""Extracts the company type from a given Unternehmensregister export.
|
"""Extracts the company type from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -235,8 +244,7 @@ def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|||||||
return CompanyTypeEnum("Kommanditgesellschaft")
|
return CompanyTypeEnum("Kommanditgesellschaft")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_capital(self, data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
||||||
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
|
||||||
"""Extracts the company capital from the given Unternehmensregister export.
|
"""Extracts the company capital from the given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -258,7 +266,9 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
|||||||
if isinstance(base, list):
|
if isinstance(base, list):
|
||||||
for entry in base:
|
for entry in base:
|
||||||
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
|
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
|
||||||
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
|
capital["Zahl"] = capital["Zahl"] + float(
|
||||||
|
entry["Hafteinlage"]["Zahl"]
|
||||||
|
)
|
||||||
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
|
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
|
||||||
elif isinstance(base, dict):
|
elif isinstance(base, dict):
|
||||||
capital = base["Hafteinlage"]
|
capital = base["Hafteinlage"]
|
||||||
@ -307,8 +317,7 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def map_business_purpose(self, data: dict) -> str | None:
|
||||||
def map_business_purpose(data: dict) -> str | None:
|
|
||||||
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -324,8 +333,7 @@ def map_business_purpose(data: dict) -> str | None:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_founding_date(self, data: dict) -> str | None:
|
||||||
def map_founding_date(data: dict) -> str | None:
|
|
||||||
"""Extracts the founding date from a given Unternehmensregister export.
|
"""Extracts the founding date from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -358,8 +366,7 @@ def map_founding_date(data: dict) -> str | None:
|
|||||||
# No reliable answer
|
# No reliable answer
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_company_id(self, data: dict) -> CompanyID:
|
||||||
def map_company_id(data: dict) -> CompanyID:
|
|
||||||
"""Retrieve Company ID from export.
|
"""Retrieve Company ID from export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -404,8 +411,7 @@ def map_company_id(data: dict) -> CompanyID:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def map_last_update(self, data: dict) -> str:
|
||||||
def map_last_update(data: dict) -> str:
|
|
||||||
"""Extract last update date from export.
|
"""Extract last update date from export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -414,10 +420,11 @@ def map_last_update(data: dict) -> str:
|
|||||||
Returns:
|
Returns:
|
||||||
str: Last update date
|
str: Last update date
|
||||||
"""
|
"""
|
||||||
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"]
|
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"][
|
||||||
|
"letzte_Eintragung"
|
||||||
|
]
|
||||||
|
|
||||||
|
def map_unternehmensregister_json(self, data: dict) -> Company:
|
||||||
def map_unternehmensregister_json(data: dict) -> Company:
|
|
||||||
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -429,21 +436,22 @@ def map_unternehmensregister_json(data: dict) -> Company:
|
|||||||
result: dict = {"relationships": []}
|
result: dict = {"relationships": []}
|
||||||
|
|
||||||
# TODO Refactor mapping - this is a nightmare...
|
# TODO Refactor mapping - this is a nightmare...
|
||||||
result["id"] = map_company_id(data)
|
result["id"] = self.map_company_id(data)
|
||||||
result["name"] = name_from_beteiligung(data)
|
result["name"] = self.name_from_beteiligung(data)
|
||||||
|
|
||||||
result["location"] = loc_from_beteiligung(data)
|
result["location"] = self.loc_from_beteiligung(data)
|
||||||
result["last_update"] = map_last_update(data)
|
result["last_update"] = self.map_last_update(data)
|
||||||
|
|
||||||
result["company_type"] = map_rechtsform(result["name"], data)
|
result["company_type"] = self.map_rechtsform(result["name"], data)
|
||||||
result["capital"] = map_capital(data, result["company_type"])
|
result["capital"] = self.map_capital(data, result["company_type"])
|
||||||
result["business_purpose"] = map_business_purpose(data)
|
result["business_purpose"] = self.map_business_purpose(data)
|
||||||
result["founding_date"] = map_founding_date(data)
|
result["founding_date"] = self.map_founding_date(data)
|
||||||
|
|
||||||
for i in range(
|
for i in range(
|
||||||
2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"])
|
2,
|
||||||
|
len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"]),
|
||||||
):
|
):
|
||||||
people = parse_stakeholder(
|
people = self.parse_stakeholder(
|
||||||
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
|
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
|
||||||
)
|
)
|
||||||
result["relationships"].append(people)
|
result["relationships"].append(people)
|
||||||
|
@ -19,6 +19,7 @@ from aki_prj23_transparenzregister.models.company import (
|
|||||||
RelationshipRoleEnum,
|
RelationshipRoleEnum,
|
||||||
)
|
)
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
|
||||||
|
BaseTransformer,
|
||||||
map_co_relation,
|
map_co_relation,
|
||||||
normalize_street,
|
normalize_street,
|
||||||
traversal,
|
traversal,
|
||||||
@ -32,7 +33,10 @@ from aki_prj23_transparenzregister.utils.string_tools import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_date_of_birth(data: dict) -> str | None:
|
class V3_Transformer(BaseTransformer): # noqa: N801
|
||||||
|
"""Transformer for data exports from Unternehmensregister (v3)."""
|
||||||
|
|
||||||
|
def parse_date_of_birth(self, data: dict) -> str | None:
|
||||||
"""Retreives the date of birth from a stakeholder entry if possible.
|
"""Retreives the date of birth from a stakeholder entry if possible.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -51,8 +55,7 @@ def parse_date_of_birth(data: dict) -> str | None:
|
|||||||
return base
|
return base
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_role_id_to_enum(self, role_id: str) -> RelationshipRoleEnum:
|
||||||
def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
|
|
||||||
"""Map Unternehmensregister role ID to RelationshipRoleEnum.
|
"""Map Unternehmensregister role ID to RelationshipRoleEnum.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -64,8 +67,7 @@ def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
|
|||||||
mapper = RoleMapper.mapper()
|
mapper = RoleMapper.mapper()
|
||||||
return mapper.get(role_id)
|
return mapper.get(role_id)
|
||||||
|
|
||||||
|
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
|
||||||
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|
||||||
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -74,7 +76,10 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
Returns:
|
Returns:
|
||||||
CompanyRelationship | None: Relationship if it could be processed
|
CompanyRelationship | None: Relationship if it could be processed
|
||||||
"""
|
"""
|
||||||
if "tns:natuerlichePerson" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
|
if (
|
||||||
|
"tns:natuerlichePerson"
|
||||||
|
in data["tns:beteiligter"]["tns:auswahl_beteiligter"]
|
||||||
|
):
|
||||||
# It's a Company serving as a "Kommanditist" or similar
|
# It's a Company serving as a "Kommanditist" or similar
|
||||||
if (
|
if (
|
||||||
"tns:vorname"
|
"tns:vorname"
|
||||||
@ -91,9 +96,11 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
),
|
),
|
||||||
"location": Location(
|
"location": Location(
|
||||||
**{
|
**{
|
||||||
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
"city": data["tns:beteiligter"][
|
||||||
"tns:natuerlichePerson"
|
"tns:auswahl_beteiligter"
|
||||||
]["tns:anschrift"][-1]["tns:ort"]
|
]["tns:natuerlichePerson"]["tns:anschrift"][-1][
|
||||||
|
"tns:ort"
|
||||||
|
]
|
||||||
if isinstance(
|
if isinstance(
|
||||||
data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
||||||
"tns:natuerlichePerson"
|
"tns:natuerlichePerson"
|
||||||
@ -105,7 +112,7 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
]["tns:anschrift"]["tns:ort"]
|
]["tns:anschrift"]["tns:ort"]
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
"role": map_role_id_to_enum(
|
"role": self.map_role_id_to_enum(
|
||||||
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
||||||
),
|
),
|
||||||
"type": CompanyRelationshipEnum.COMPANY,
|
"type": CompanyRelationshipEnum.COMPANY,
|
||||||
@ -115,15 +122,17 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
**{ # type: ignore
|
**{ # type: ignore
|
||||||
"name": PersonName(
|
"name": PersonName(
|
||||||
**{
|
**{
|
||||||
"firstname": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
"firstname": data["tns:beteiligter"][
|
||||||
"tns:natuerlichePerson"
|
"tns:auswahl_beteiligter"
|
||||||
]["tns:vollerName"]["tns:vorname"],
|
]["tns:natuerlichePerson"]["tns:vollerName"]["tns:vorname"],
|
||||||
"lastname": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
"lastname": data["tns:beteiligter"][
|
||||||
"tns:natuerlichePerson"
|
"tns:auswahl_beteiligter"
|
||||||
]["tns:vollerName"]["tns:nachname"],
|
]["tns:natuerlichePerson"]["tns:vollerName"][
|
||||||
|
"tns:nachname"
|
||||||
|
],
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
"date_of_birth": parse_date_of_birth(data),
|
"date_of_birth": self.parse_date_of_birth(data),
|
||||||
"location": Location(
|
"location": Location(
|
||||||
**{
|
**{
|
||||||
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
||||||
@ -140,14 +149,16 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
]["tns:anschrift"]["tns:ort"]
|
]["tns:anschrift"]["tns:ort"]
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
"role": map_role_id_to_enum(
|
"role": self.map_role_id_to_enum(
|
||||||
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
||||||
),
|
),
|
||||||
"type": CompanyRelationshipEnum.PERSON,
|
"type": CompanyRelationshipEnum.PERSON,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
if "tns:organisation" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
|
if "tns:organisation" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
|
||||||
base = data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:organisation"]
|
base = data["tns:beteiligter"]["tns:auswahl_beteiligter"][
|
||||||
|
"tns:organisation"
|
||||||
|
]
|
||||||
|
|
||||||
location = None
|
location = None
|
||||||
if "tns:anschrift" in base:
|
if "tns:anschrift" in base:
|
||||||
@ -161,7 +172,7 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
if "tns:hausnummer" in base["tns:anschrift"]
|
if "tns:hausnummer" in base["tns:anschrift"]
|
||||||
else None,
|
else None,
|
||||||
"zip_code": base["tns:anschrift"]["tns:postleitzahl"]
|
"zip_code": base["tns:anschrift"]["tns:postleitzahl"]
|
||||||
if "tns:potsleitzahl" in base["tns:anschrift"]
|
if "tns:postleitzahl" in base["tns:anschrift"]
|
||||||
else None,
|
else None,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -176,14 +187,14 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
if "tns:hausnummer" in base["tns:sitz"]
|
if "tns:hausnummer" in base["tns:sitz"]
|
||||||
else None,
|
else None,
|
||||||
"zip_code": base["tns:sitz"]["tns:postleitzahl"]
|
"zip_code": base["tns:sitz"]["tns:postleitzahl"]
|
||||||
if "tns:potsleitzahl" in base["tns:sitz"]
|
if "tns:postleitzahl" in base["tns:sitz"]
|
||||||
else None,
|
else None,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
return CompanyToCompanyRelationship(
|
return CompanyToCompanyRelationship(
|
||||||
**{ # type: ignore
|
**{ # type: ignore
|
||||||
"role": map_role_id_to_enum(
|
"role": self.map_role_id_to_enum(
|
||||||
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
|
||||||
),
|
),
|
||||||
"name": remove_traling_and_leading_quotes(
|
"name": remove_traling_and_leading_quotes(
|
||||||
@ -195,8 +206,7 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def loc_from_beteiligung(self, data: dict) -> Location:
|
||||||
def loc_from_beteiligung(data: dict) -> Location:
|
|
||||||
"""Extract the company location from the first relationship in the export.
|
"""Extract the company location from the first relationship in the export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -242,8 +252,7 @@ def loc_from_beteiligung(data: dict) -> Location:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def name_from_beteiligung(self, data: dict) -> str:
|
||||||
def name_from_beteiligung(data: dict) -> str:
|
|
||||||
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -266,8 +275,7 @@ def name_from_beteiligung(data: dict) -> str:
|
|||||||
name = traversal(data, path)
|
name = traversal(data, path)
|
||||||
return remove_traling_and_leading_quotes(name)
|
return remove_traling_and_leading_quotes(name)
|
||||||
|
|
||||||
|
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
|
||||||
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|
||||||
"""Extracts the company type from a given Unternehmensregister export.
|
"""Extracts the company type from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -300,9 +308,8 @@ def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|||||||
return CompanyTypeEnum("Kommanditgesellschaft")
|
return CompanyTypeEnum("Kommanditgesellschaft")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def map_capital( # noqa: PLR0912
|
def map_capital( # noqa: PLR0912
|
||||||
data: dict, company_type: CompanyTypeEnum
|
self, data: dict, company_type: CompanyTypeEnum
|
||||||
) -> Capital | None:
|
) -> Capital | None:
|
||||||
"""Extracts the company capital from the given Unternehmensregister export.
|
"""Extracts the company capital from the given Unternehmensregister export.
|
||||||
|
|
||||||
@ -386,8 +393,7 @@ def map_capital( # noqa: PLR0912
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def map_business_purpose(self, data: dict) -> str | None:
|
||||||
def map_business_purpose(data: dict) -> str | None:
|
|
||||||
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -402,8 +408,7 @@ def map_business_purpose(data: dict) -> str | None:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_founding_date(self, data: dict) -> str | None:
|
||||||
def map_founding_date(data: dict) -> str | None:
|
|
||||||
"""Extracts the founding date from a given Unternehmensregister export.
|
"""Extracts the founding date from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -424,7 +429,10 @@ def map_founding_date(data: dict) -> str | None:
|
|||||||
)
|
)
|
||||||
if len(entry_date) == 1:
|
if len(entry_date) == 1:
|
||||||
return transform_date_to_iso(entry_date[0])
|
return transform_date_to_iso(entry_date[0])
|
||||||
if "tns:satzungsdatum" in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]:
|
if (
|
||||||
|
"tns:satzungsdatum"
|
||||||
|
in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]
|
||||||
|
):
|
||||||
path = [
|
path = [
|
||||||
"tns:fachdatenRegister",
|
"tns:fachdatenRegister",
|
||||||
"tns:basisdatenRegister",
|
"tns:basisdatenRegister",
|
||||||
@ -436,8 +444,7 @@ def map_founding_date(data: dict) -> str | None:
|
|||||||
# No reliable answer
|
# No reliable answer
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def map_hr_number(self, data: dict) -> str:
|
||||||
def map_hr_number(data: dict) -> str:
|
|
||||||
"""Extract the HR number from a given Unternehmensregister export.
|
"""Extract the HR number from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -460,8 +467,7 @@ def map_hr_number(data: dict) -> str:
|
|||||||
return base["tns:aktenzeichen.freitext"]
|
return base["tns:aktenzeichen.freitext"]
|
||||||
raise KeyError("Could not find HR number")
|
raise KeyError("Could not find HR number")
|
||||||
|
|
||||||
|
def map_district_court(self, data: dict) -> DistrictCourt:
|
||||||
def map_district_court(data: dict) -> DistrictCourt:
|
|
||||||
"""Extract the district court from a given Unternehmensregister export.
|
"""Extract the district court from a given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -481,12 +487,12 @@ def map_district_court(data: dict) -> DistrictCourt:
|
|||||||
]
|
]
|
||||||
path = [*base_path, "tns:bezeichnung", "tns:bezeichnung.aktuell"]
|
path = [*base_path, "tns:bezeichnung", "tns:bezeichnung.aktuell"]
|
||||||
name = traversal(data, path)
|
name = traversal(data, path)
|
||||||
|
|
||||||
path = [*base_path, "tns:anschrift", "tns:ort"]
|
path = [*base_path, "tns:anschrift", "tns:ort"]
|
||||||
city = traversal(data, path)
|
city = traversal(data, path)
|
||||||
return DistrictCourt(name=name, city=city)
|
return DistrictCourt(name=name, city=city)
|
||||||
|
|
||||||
|
def map_company_id(self, data: dict) -> CompanyID:
|
||||||
def map_company_id(data: dict) -> CompanyID:
|
|
||||||
"""Retrieve Company ID from export.
|
"""Retrieve Company ID from export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -496,18 +502,19 @@ def map_company_id(data: dict) -> CompanyID:
|
|||||||
CompanyID: ID of the company
|
CompanyID: ID of the company
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return CompanyID(map_hr_number(data), map_district_court(data)) # type: ignore
|
return CompanyID(hr_number=self.map_hr_number(data), district_court=self.map_district_court(data)) # type: ignore
|
||||||
except KeyError:
|
except KeyError:
|
||||||
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][0][
|
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"][
|
||||||
"tns:beteiligter"
|
"tns:beteiligung"
|
||||||
]["tns:auswahl_beteiligter"]["tns:organisation"]["tns:registereintragung"][
|
][0]["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:organisation"][
|
||||||
|
"tns:registereintragung"
|
||||||
|
][
|
||||||
"tns:registernummer"
|
"tns:registernummer"
|
||||||
]
|
]
|
||||||
district_court = map_district_court(data)
|
district_court = self.map_district_court(data)
|
||||||
return CompanyID(hr_number=hr_number, district_court=district_court)
|
return CompanyID(hr_number=hr_number, district_court=district_court)
|
||||||
|
|
||||||
|
def map_last_update(self, data: dict) -> str:
|
||||||
def map_last_update(data: dict) -> str:
|
|
||||||
"""Extract last update date from export.
|
"""Extract last update date from export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -519,9 +526,8 @@ def map_last_update(data: dict) -> str:
|
|||||||
path = ["tns:fachdatenRegister", "tns:auszug", "tns:letzteEintragung"]
|
path = ["tns:fachdatenRegister", "tns:auszug", "tns:letzteEintragung"]
|
||||||
return traversal(data, path)
|
return traversal(data, path)
|
||||||
|
|
||||||
|
|
||||||
# TODO class model with inheritance - only difference: Determine root in __init__
|
# TODO class model with inheritance - only difference: Determine root in __init__
|
||||||
def map_unternehmensregister_json(data: dict) -> Company:
|
def map_unternehmensregister_json(self, data: dict) -> Company:
|
||||||
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -534,21 +540,21 @@ def map_unternehmensregister_json(data: dict) -> Company:
|
|||||||
data = data[root_key]
|
data = data[root_key]
|
||||||
result: dict = {"relationships": []}
|
result: dict = {"relationships": []}
|
||||||
|
|
||||||
result["id"] = map_company_id(data)
|
result["id"] = self.map_company_id(data)
|
||||||
result["name"] = name_from_beteiligung(data)
|
result["name"] = self.name_from_beteiligung(data)
|
||||||
|
|
||||||
result["location"] = loc_from_beteiligung(data)
|
result["location"] = self.loc_from_beteiligung(data)
|
||||||
result["last_update"] = map_last_update(data)
|
result["last_update"] = self.map_last_update(data)
|
||||||
|
|
||||||
result["company_type"] = map_rechtsform(result["name"], data)
|
result["company_type"] = self.map_rechtsform(result["name"], data)
|
||||||
result["capital"] = map_capital(data, result["company_type"])
|
result["capital"] = self.map_capital(data, result["company_type"])
|
||||||
result["business_purpose"] = map_business_purpose(data)
|
result["business_purpose"] = self.map_business_purpose(data)
|
||||||
result["founding_date"] = map_founding_date(data)
|
result["founding_date"] = self.map_founding_date(data)
|
||||||
|
|
||||||
for i in range(
|
for i in range(
|
||||||
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
|
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
|
||||||
):
|
):
|
||||||
people = parse_stakeholder(
|
people = self.parse_stakeholder(
|
||||||
data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
|
data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
|
||||||
)
|
)
|
||||||
result["relationships"].append(people)
|
result["relationships"].append(people)
|
||||||
|
@ -16,10 +16,12 @@ from aki_prj23_transparenzregister.models.company import (
|
|||||||
PersonToCompanyRelationship,
|
PersonToCompanyRelationship,
|
||||||
RelationshipRoleEnum,
|
RelationshipRoleEnum,
|
||||||
)
|
)
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import (
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1 import (
|
||||||
v1 as transform,
|
V1_Transformer,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
transform = V1_Transformer()
|
||||||
|
|
||||||
|
|
||||||
def test_parse_stakeholder_org_hidden_in_person() -> None:
|
def test_parse_stakeholder_org_hidden_in_person() -> None:
|
||||||
data = {
|
data = {
|
||||||
@ -656,31 +658,31 @@ def test_map_last_update() -> None:
|
|||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_co_relation"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_co_relation"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_company_id"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_company_id"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.name_from_beteiligung"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.name_from_beteiligung"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.loc_from_beteiligung"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.loc_from_beteiligung"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_last_update"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_last_update"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_rechtsform"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_rechtsform"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_capital"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_capital"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_business_purpose"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_business_purpose"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_founding_date"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_founding_date"
|
||||||
)
|
)
|
||||||
@patch(
|
@patch(
|
||||||
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.parse_stakeholder"
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.parse_stakeholder"
|
||||||
)
|
)
|
||||||
def test_map_unternehmensregister_json( # noqa: PLR0913
|
def test_map_unternehmensregister_json( # noqa: PLR0913
|
||||||
mock_map_parse_stakeholder: Mock,
|
mock_map_parse_stakeholder: Mock,
|
||||||
|
@ -0,0 +1,731 @@
|
|||||||
|
"""Testing utils/data_extraction/unternehmensregister/transform.py."""
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from aki_prj23_transparenzregister.models.company import (
|
||||||
|
Capital,
|
||||||
|
CapitalTypeEnum,
|
||||||
|
Company,
|
||||||
|
CompanyID,
|
||||||
|
CompanyRelationshipEnum,
|
||||||
|
CompanyToCompanyRelationship,
|
||||||
|
CompanyTypeEnum,
|
||||||
|
CurrencyEnum,
|
||||||
|
DistrictCourt,
|
||||||
|
Location,
|
||||||
|
PersonName,
|
||||||
|
PersonToCompanyRelationship,
|
||||||
|
RelationshipRoleEnum,
|
||||||
|
)
|
||||||
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3 import (
|
||||||
|
V3_Transformer,
|
||||||
|
)
|
||||||
|
|
||||||
|
transform = V3_Transformer()
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_org_hidden_in_person() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:natuerlichePerson": {
|
||||||
|
"tns:vollerName": {"tns:nachname": '"Some Company KG'},
|
||||||
|
"tns:anschrift": {"tns:ort": "Area 51"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tns:rolle": {"tns:rollenbezeichnung": {"code": "275"}},
|
||||||
|
}
|
||||||
|
expected_result = CompanyToCompanyRelationship(
|
||||||
|
role=RelationshipRoleEnum.KOMMANDITIST, # type: ignore
|
||||||
|
name="Some Company KG",
|
||||||
|
type=CompanyRelationshipEnum.COMPANY,
|
||||||
|
location=Location(**{"city": "Area 51"}),
|
||||||
|
)
|
||||||
|
assert transform.parse_stakeholder(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_person() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:natuerlichePerson": {
|
||||||
|
"tns:vollerName": {
|
||||||
|
"tns:vorname": "Stephen",
|
||||||
|
"tns:nachname": "King",
|
||||||
|
},
|
||||||
|
"tns:anschrift": {"tns:ort": "Maine"},
|
||||||
|
"tns:geburt": {"tns:geburtsdatum": "1947-09-21"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tns:rolle": {"tns:rollenbezeichnung": {"code": "269"}},
|
||||||
|
}
|
||||||
|
expected_result = PersonToCompanyRelationship(
|
||||||
|
role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore
|
||||||
|
date_of_birth="1947-09-21",
|
||||||
|
name=PersonName(**{"firstname": "Stephen", "lastname": "King"}),
|
||||||
|
type=CompanyRelationshipEnum.PERSON,
|
||||||
|
location=Location(**{"city": "Maine"}),
|
||||||
|
)
|
||||||
|
assert transform.parse_stakeholder(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_person_missing_date_of_birth() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:natuerlichePerson": {
|
||||||
|
"tns:vollerName": {
|
||||||
|
"tns:vorname": "Stephen",
|
||||||
|
"tns:nachname": "King",
|
||||||
|
},
|
||||||
|
"tns:anschrift": {"tns:ort": "Maine"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tns:rolle": {"tns:rollenbezeichnung": {"code": "269"}},
|
||||||
|
}
|
||||||
|
expected_result = PersonToCompanyRelationship(
|
||||||
|
role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore
|
||||||
|
date_of_birth=None,
|
||||||
|
name=PersonName(**{"firstname": "Stephen", "lastname": "King"}),
|
||||||
|
type=CompanyRelationshipEnum.PERSON,
|
||||||
|
location=Location(**{"city": "Maine"}),
|
||||||
|
)
|
||||||
|
assert transform.parse_stakeholder(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_org() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:bezeichnung": {
|
||||||
|
"tns:bezeichnung.aktuell": "Transparenzregister kG"
|
||||||
|
},
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:ort": "Iserlohn",
|
||||||
|
"tns:strasse": "Hauptstrasse",
|
||||||
|
"tns:hausnummer": "42",
|
||||||
|
"tns:postleitzahl": "58636",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tns:rolle": {"tns:rollenbezeichnung": {"code": "268"}},
|
||||||
|
}
|
||||||
|
expected_result = CompanyToCompanyRelationship(
|
||||||
|
name="Transparenzregister kG",
|
||||||
|
role=RelationshipRoleEnum.DIREKTOR, # type: ignore
|
||||||
|
type=CompanyRelationshipEnum.COMPANY,
|
||||||
|
location=Location(
|
||||||
|
**{
|
||||||
|
"city": "Iserlohn",
|
||||||
|
"zip_code": "58636",
|
||||||
|
"house_number": "42",
|
||||||
|
"street": "Hauptstrasse",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
assert transform.parse_stakeholder(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_org_loc_from_sitz() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:bezeichnung": {
|
||||||
|
"tns:bezeichnung.aktuell": "Transparenzregister kG"
|
||||||
|
},
|
||||||
|
"tns:sitz": {
|
||||||
|
"tns:ort": "Iserlohn",
|
||||||
|
"tns:strasse": "Hauptstrasse",
|
||||||
|
"tns:hausnummer": "42",
|
||||||
|
"tns:postleitzahl": "58636",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tns:rolle": {"tns:rollenbezeichnung": {"code": "268"}},
|
||||||
|
}
|
||||||
|
expected_result = CompanyToCompanyRelationship(
|
||||||
|
name="Transparenzregister kG",
|
||||||
|
role=RelationshipRoleEnum.DIREKTOR, # type: ignore
|
||||||
|
type=CompanyRelationshipEnum.COMPANY,
|
||||||
|
location=Location(
|
||||||
|
**{
|
||||||
|
"city": "Iserlohn",
|
||||||
|
"zip_code": "58636",
|
||||||
|
"house_number": "42",
|
||||||
|
"street": "Hauptstrasse",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
assert transform.parse_stakeholder(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_stakeholder_no_result() -> None:
|
||||||
|
data: dict = {"tns:beteiligter": {"tns:auswahl_beteiligter": {}}} # type: ignore
|
||||||
|
assert transform.parse_stakeholder(data) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_loc_from_beteiligung() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:strasse": "Gewerbestraße",
|
||||||
|
"tns:hausnummer": "8",
|
||||||
|
"tns:postleitzahl": "72535",
|
||||||
|
"tns:ort": "Heroldstatt",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = Location(
|
||||||
|
city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535"
|
||||||
|
)
|
||||||
|
assert transform.loc_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_loc_from_beteiligung_number_contained_in_street() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:strasse": "Gewerbestraße8",
|
||||||
|
"tns:postleitzahl": "72535",
|
||||||
|
"tns:ort": "Heroldstatt",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = Location(
|
||||||
|
city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535"
|
||||||
|
)
|
||||||
|
assert transform.loc_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_loc_from_beteiligung_no_result() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:postleitzahl": "72535",
|
||||||
|
"tns:ort": "Heroldstatt",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = Location(
|
||||||
|
city="Heroldstatt", house_number=None, street=None, zip_code="72535"
|
||||||
|
)
|
||||||
|
assert transform.loc_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_loc_from_beteiligung_combine() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:postleitzahl": "72535",
|
||||||
|
"tns:strasse": "Pliangenserstr. 40",
|
||||||
|
"tns:hausnummer": "a",
|
||||||
|
"tns:ort": "Heroldstatt",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = Location(
|
||||||
|
city="Heroldstatt",
|
||||||
|
house_number="40a",
|
||||||
|
street="Pliangenserstraße",
|
||||||
|
zip_code="72535",
|
||||||
|
)
|
||||||
|
assert transform.loc_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_name_from_beteiligung() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:bezeichnung": {
|
||||||
|
"tns:bezeichnung.aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
|
||||||
|
assert transform.name_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_name_from_beteiligung_remove_quotes() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:bezeichnung": {
|
||||||
|
"tns:bezeichnung.aktuell": '"Siemes Verwaltungs-GmbH"'
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_result = "Siemes Verwaltungs-GmbH"
|
||||||
|
assert transform.name_from_beteiligung(data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_rechtsform() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:basisdatenRegister": {
|
||||||
|
"tns:rechtstraeger": {
|
||||||
|
"tns:angabenZurRechtsform": {
|
||||||
|
"tns:rechtsform": {
|
||||||
|
"code": "Gesellschaft mit beschränkter Haftung"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected_result = CompanyTypeEnum.GMBH
|
||||||
|
assert transform.map_rechtsform("", data) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_rechtsform_from_name() -> None:
|
||||||
|
data = [
|
||||||
|
("GEA Farm Technologies GmbH", "Gesellschaft mit beschränkter Haftung"),
|
||||||
|
("Atos SE", "Europäische Aktiengesellschaft (SE)"),
|
||||||
|
("Bilkenroth KG", "Kommanditgesellschaft"),
|
||||||
|
("jfoiahfo8sah 98548902 öhz ö", None),
|
||||||
|
]
|
||||||
|
|
||||||
|
for company_name, expected_result in data:
|
||||||
|
assert transform.map_rechtsform(company_name, {}) == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_kg_single() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.EURO, value=69000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:personengesellschaft": {
|
||||||
|
"tns:zusatzKG": {
|
||||||
|
"tns:datenKommanditist": {
|
||||||
|
"tns:hafteinlage": {
|
||||||
|
"tns:zahl": str(capital.value),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
|
||||||
|
assert result == capital
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_kg_sum() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.EURO, value=20000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:personengesellschaft": {
|
||||||
|
"tns:zusatzKG": {
|
||||||
|
"tns:datenKommanditist": [
|
||||||
|
{
|
||||||
|
"tns:hafteinlage": {
|
||||||
|
"tns:zahl": str(10000),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"tns:hafteinlage": {
|
||||||
|
"tns:zahl": str(10000),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
|
||||||
|
assert result == capital
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_no_fachdaten() -> None:
|
||||||
|
data: dict = {"tns:fachdatenRegister": {}}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_gmbh() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:kapitalgesellschaft": {
|
||||||
|
"tns:zusatzGmbH": {
|
||||||
|
"tns:stammkapital": {
|
||||||
|
"tns:zahl": str(capital.value),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.GMBH) # type: ignore
|
||||||
|
assert result == capital
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_ag() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.GRUNDKAPITAL # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:kapitalgesellschaft": {
|
||||||
|
"tns:zusatzAktiengesellschaft": {
|
||||||
|
"tns:grundkapital": {
|
||||||
|
"tns:hoehe": {
|
||||||
|
"tns:zahl": str(capital.value),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.SE) # type: ignore
|
||||||
|
assert result == capital
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_personengesellschaft() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:personengesellschaft": {
|
||||||
|
"tns:zusatzGmbH": {
|
||||||
|
"tns:stammkapital": {
|
||||||
|
"tns:zahl": str(capital.value),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
|
||||||
|
assert result == capital
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_einzelkaufmann() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"Personengesellschaft": {
|
||||||
|
"tns:zusatzGmbH": {
|
||||||
|
"tns:stammkapital": {
|
||||||
|
"tns:zahl": str(capital.value),
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.EINZELKAUFMANN) # type: ignore
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_capital_partial_null_values() -> None:
|
||||||
|
capital = Capital(
|
||||||
|
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
|
||||||
|
)
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:auswahl_zusatzangaben": {
|
||||||
|
"tns:personengesellschaft": {
|
||||||
|
"tns:zusatzGmbH": {
|
||||||
|
"tns:stammkapital": {
|
||||||
|
"tns:zahl": None,
|
||||||
|
"tns:waehrung": {"code": capital.currency},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_business_purpose() -> None:
|
||||||
|
business_purpose = "Handel mit Betäubungsmitteln aller Art"
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:basisdatenRegister": {"tns:gegenstand": business_purpose}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_business_purpose(data)
|
||||||
|
assert result == business_purpose
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_business_purpose_no_result() -> None:
|
||||||
|
data: dict = {}
|
||||||
|
|
||||||
|
result = transform.map_business_purpose(data)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_founding_date_from_tag_der_ersten_eintragung() -> None:
|
||||||
|
data = {
|
||||||
|
"some entry": "Tag der ersten Eintragung: 01.05.2004",
|
||||||
|
"some other entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
|
||||||
|
}
|
||||||
|
expected_result = "2004-05-01"
|
||||||
|
result = transform.map_founding_date(data)
|
||||||
|
assert result == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_founding_date_from_gesellschaftsvertrag() -> None:
|
||||||
|
data = {
|
||||||
|
"some entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
|
||||||
|
"some other entry": "Das Wesen der Rekursion ist der Selbstaufruf Gesellschaftsvertrag vom 22.12.1996 Hallo Welt",
|
||||||
|
}
|
||||||
|
expected_result = "1996-12-22"
|
||||||
|
result = transform.map_founding_date(data)
|
||||||
|
assert result == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_founding_date_from_gruendungsdatum() -> None:
|
||||||
|
data = {
|
||||||
|
"tns:fachdatenRegister": {
|
||||||
|
"tns:basisdatenRegister": {
|
||||||
|
"tns:satzungsdatum": {"tns:aktuellesSatzungsdatum": "1998-01-01"}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected_result = "1998-01-01"
|
||||||
|
result = transform.map_founding_date(data)
|
||||||
|
assert result == expected_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_founding_date_no_result() -> None:
|
||||||
|
data: dict = {"tns:fachdatenRegister": {"tns:basisdatenRegister": {}}}
|
||||||
|
result = transform.map_founding_date(data)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_company_id() -> None:
|
||||||
|
district_court = DistrictCourt("Amtsgericht Ulm", "Ulm")
|
||||||
|
company_id = CompanyID(district_court, "HRA 4711")
|
||||||
|
data = {
|
||||||
|
"tns:grunddaten": {
|
||||||
|
"tns:verfahrensdaten": {
|
||||||
|
"tns:instanzdaten": {
|
||||||
|
"tns:aktenzeichen": {
|
||||||
|
"tns:auswahl_aktenzeichen": {
|
||||||
|
"tns:aktenzeichen.freitext": company_id.hr_number
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"tns:beteiligung": [
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
"tns:beteiligter": {
|
||||||
|
"tns:auswahl_beteiligter": {
|
||||||
|
"tns:organisation": {
|
||||||
|
"tns:bezeichnung": {
|
||||||
|
"tns:bezeichnung.aktuell": district_court.name
|
||||||
|
},
|
||||||
|
"tns:anschrift": {
|
||||||
|
"tns:ort": district_court.city,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
result = transform.map_company_id(data)
|
||||||
|
assert result == company_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_map_last_update() -> None:
|
||||||
|
date = "2024-01-01"
|
||||||
|
data = {"tns:fachdatenRegister": {"tns:auszug": {"tns:letzteEintragung": date}}}
|
||||||
|
result = transform.map_last_update(data)
|
||||||
|
assert result == date
|
||||||
|
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.map_co_relation"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_company_id"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.name_from_beteiligung"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.loc_from_beteiligung"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_last_update"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_rechtsform"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_capital"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_business_purpose"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_founding_date"
|
||||||
|
)
|
||||||
|
@patch(
|
||||||
|
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.parse_stakeholder"
|
||||||
|
)
|
||||||
|
def test_map_unternehmensregister_json( # noqa: PLR0913
|
||||||
|
mock_map_parse_stakeholder: Mock,
|
||||||
|
mock_map_founding_date: Mock,
|
||||||
|
mock_map_business_purpose: Mock,
|
||||||
|
mock_map_capital: Mock,
|
||||||
|
mock_map_rechtsform: Mock,
|
||||||
|
mock_map_last_update: Mock,
|
||||||
|
mock_loc_from_beteiligung: Mock,
|
||||||
|
mock_map_name_from_beteiligung: Mock,
|
||||||
|
mock_map_company_id: Mock,
|
||||||
|
mock_map_co_relation: Mock,
|
||||||
|
) -> None:
|
||||||
|
expected_result = Company(
|
||||||
|
**{ # type: ignore
|
||||||
|
"id": Mock(),
|
||||||
|
"name": Mock(),
|
||||||
|
"location": Mock(),
|
||||||
|
"last_update": Mock(),
|
||||||
|
"company_type": Mock(),
|
||||||
|
"capital": Mock(),
|
||||||
|
"business_purpose": Mock(),
|
||||||
|
"founding_date": Mock(),
|
||||||
|
"relationships": [Mock()],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_map_company_id.return_value = expected_result.id
|
||||||
|
mock_map_name_from_beteiligung.return_value = expected_result.name
|
||||||
|
mock_loc_from_beteiligung.return_value = expected_result.location
|
||||||
|
mock_map_last_update.return_value = expected_result.last_update
|
||||||
|
mock_map_rechtsform.return_value = expected_result.company_type
|
||||||
|
mock_map_capital.return_value = expected_result.capital
|
||||||
|
mock_map_business_purpose.return_value = expected_result.business_purpose
|
||||||
|
mock_map_founding_date.return_value = expected_result.founding_date
|
||||||
|
mock_map_parse_stakeholder.return_value = expected_result.relationships[0]
|
||||||
|
mock_map_co_relation.side_effect = lambda x: x
|
||||||
|
|
||||||
|
data: dict = {
|
||||||
|
"rootLayerWithSomeStuipStringNooneCaresAbout": {
|
||||||
|
"tns:grunddaten": {"tns:verfahrensdaten": {"tns:beteiligung": [{}, {}, {}]}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = transform.map_unternehmensregister_json(data)
|
||||||
|
assert result == expected_result
|
Reference in New Issue
Block a user