Feat/233 incremental company extension (#322)

1. New app capable of processing the `missing_company` table has been
added
2. Data transformation for Unternehmensregister data has been extended
to handle v1 and v3 data with a generic layer in between selecting the
right API upon request

Sorry for the big PR, if preferred I can give a quick tour through the
code rather than having you review every line of code
This commit is contained in:
Tristan Nolde 2023-11-08 14:36:44 +01:00 committed by GitHub
commit f4998a6fae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 5340 additions and 750 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
# Data blobs
**/*.xml
**/*.json
# LaTeX temp files
**/*.aux
**/*-blx.bib

View File

@ -0,0 +1,145 @@
"""Retrieve missing companies from unternehmensregister."""
import argparse
import dataclasses
import glob
import json
import multiprocessing
import os
import sys
import tempfile
from loguru import logger
from tqdm import tqdm
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
extract,
load,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
main as transform,
)
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService,
)
from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
from aki_prj23_transparenzregister.utils.sql import connector, entities
def work(company_name: str, config_provider: ConfigProvider) -> None:
"""Main method.
Args:
company_name (str): Name of the company to search for
config_provider (ConfigProvider): ConfigProvider
"""
with tempfile.TemporaryDirectory() as tmp_dir:
xml_dir = os.path.join(*[tmp_dir, "xml"])
os.makedirs(xml_dir, exist_ok=True)
try:
extract.scrape(company_name, xml_dir, True, True) # type: ignore
except Exception as e:
logger.error(e)
return
output_path = os.path.join(*[tmp_dir, "transformed"])
os.makedirs(output_path, exist_ok=True)
json_dir = os.path.join(*[tmp_dir, "json"])
os.makedirs(json_dir, exist_ok=True)
transform.transform_xml_to_json(
xml_dir,
json_dir,
)
for file in tqdm(glob.glob1(json_dir, "*.json")):
try:
path = os.path.join(json_dir, file)
with open(path, encoding="utf-8") as file_object:
company_mapped = transform.map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company_mapped.name if e.isalnum())[:50]
with open(
os.path.join(output_path, f"{name}.json"),
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company_mapped),
export_file,
ensure_ascii=False,
)
except Exception as e:
logger.error(e)
return
mongo_connector = MongoConnector(config_provider.get_mongo_connection_string())
company_mongo_service = CompanyMongoService(mongo_connector)
num_processed = load.load_directory_to_mongo(output_path, company_mongo_service)
mongo_connector.client.close()
try:
if num_processed > 0:
with connector.get_session(config_provider) as session:
company = (
session.query(entities.MissingCompany) # type: ignore
.where(entities.MissingCompany.name == company_name)
.first()
)
company.searched_for = True # type: ignore
session.commit()
logger.info(f"Processed {company_name}")
except Exception as e:
logger.error(e)
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Transparenzregister Webserver",
description="Starts an Dash Webserver that shows our Analysis.",
epilog="Example: webserver --log-level ERROR --log-path print.log",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
config_provider = get_config_provider(config)
session = connector.get_session(config_provider)
company_mongo_service = CompanyMongoService(
MongoConnector(config_provider.get_mongo_connection_string())
)
missing_companies = (
session.query(entities.MissingCompany)
.where(entities.MissingCompany.searched_for == False) # noqa
.all()
)
batch_size = 5
pool = multiprocessing.Pool(processes=batch_size)
# Scrape data from unternehmensregister
params = [(company.name, config_provider) for company in missing_companies]
# Map the process_handler function to the parameter list using the Pool
pool.starmap(work, params)
# Close the Pool to prevent any more tasks from being submitted
pool.close()
# Wait for all the processes to complete
pool.join()
# for company in tqdm(missing_companies):

View File

@ -3,7 +3,6 @@
import glob
import multiprocessing
import os
from pathlib import Path
from loguru import logger
from selenium import webdriver
@ -13,14 +12,22 @@ from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
def scrape(query: str, download_dir: list[str]) -> None:
def scrape(
query: str,
download_dir: str,
full_match: bool = False,
early_stopping: bool = False,
) -> None:
"""Fetch results from Unternehmensregister for given query.
Args:
query (str): Search Query (RegEx supported)
download_dir (list[str]): Directory to place output files in
full_match (bool, optional): Only scrape first result. Defaults to False.
early_stopping (bool, optional): Stop scraping after first page. Defaults to False.
"""
download_path = os.path.join(str(Path.cwd()), *download_dir)
# download_path = os.path.join(str(Path.cwd()), *download_dir)
download_path = download_dir
options = webdriver.ChromeOptions()
preferences = {
"profile.default_content_settings.popups": 0,
@ -34,6 +41,7 @@ def scrape(query: str, download_dir: list[str]) -> None:
}
options.add_argument("--headless=new")
options.add_experimental_option("prefs", preferences)
options.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(options=options)
@ -73,7 +81,9 @@ def scrape(query: str, download_dir: list[str]) -> None:
]
for index, company_link in enumerate(companies_tab):
company_name = company_names[index]
if company_name in processed_companies:
if company_name in processed_companies or (
full_match is True and company_name != query
):
continue
# Go to intermediary page
company_link.click()
@ -107,7 +117,7 @@ def scrape(query: str, download_dir: list[str]) -> None:
try:
wait.until(
lambda: wait_for_download_condition(download_path, num_files) # type: ignore
lambda x: wait_for_download_condition(download_path, num_files) # type: ignore
)
file_name = "".join(e for e in company_name if e.isalnum()) + ".xml"
rename_latest_file(
@ -120,6 +130,10 @@ def scrape(query: str, download_dir: list[str]) -> None:
finally:
for _ in range(6):
driver.back()
if company_name == query and full_match is True:
break # noqa: B012
if early_stopping is True:
break
driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click()
driver.close()

View File

@ -14,17 +14,36 @@ from aki_prj23_transparenzregister.utils.mongo.connector import (
MongoConnector,
)
def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int:
"""Load all json files in a directory to MongoDB company collection.
Args:
base_path (str): Directory to scan
service (CompanyMongoService): MongoDB service
Returns:
int: Number of processed files
"""
num_processed = 0
for file in tqdm(glob.glob1(base_path, "*.json")):
path = os.path.join(base_path, file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.migrations_of_base_data(company)
num_processed += 1
return num_processed
if __name__ == "__main__":
provider = JsonFileConfigProvider("secrets.json")
conn_string = provider.get_mongo_connection_string()
connector = MongoConnector(conn_string)
service = CompanyMongoService(connector)
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/transformed", "*.json")):
path = os.path.join(f"{base_path}/transformed", file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.migrations_of_base_data(company)
load_directory_to_mongo(
"./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed",
service,
)

View File

@ -1,590 +0,0 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import dataclasses
import glob
import json
import os
import re
import sys
import xmltodict
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.string_tools import (
remove_traling_and_leading_quotes,
transform_date_to_iso,
)
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
"""Convert all xml files in a directory to json files.
Args:
source_dir (str): Directory hosting the xml files
target_dir (str): Target directory to move json files to
"""
for source_path in [
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
]:
target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
)
with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data))
def parse_date_of_birth(data: dict) -> str | None:
"""Retreives the date of birth from a stakeholder entry if possible.
Args:
data (dict): Stakeholder data
Returns:
str | None: date of birth or None if not found
"""
if "Geburt" in (base := data["Beteiligter"]["Natuerliche_Person"]):
base = base["Geburt"]["Geburtsdatum"]
if isinstance(base, str):
return base
return None
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "Natuerliche_Person" in data["Beteiligter"]:
# It's a Company serving as a "Kommanditist" or similar
if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
return CompanyToCompanyRelationship(
**{ # type: ignore
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
][-1]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
list,
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return PersonToCompanyRelationship(
**{ # type: ignore
"name": PersonName(
**{
"firstname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Vorname"],
"lastname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
}
),
"date_of_birth": parse_date_of_birth(data),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
-1
]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.PERSON,
}
)
if "Organisation" in data["Beteiligter"]:
return CompanyToCompanyRelationship(
**{ # type: ignore
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
"Strasse"
]
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"house_number": data["Beteiligter"]["Organisation"][
"Anschrift"
]["Hausnummer"]
if "Hausnummer"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
"Postleitzahl"
]
if "Postleitzahl"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
}
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return None
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
house_number = None
street = None
if "Strasse" in base:
regex = r".(\d+)$"
hits = re.findall(regex, base["Strasse"])
if len(hits) == 1:
house_number = hits[0]
street = base["Strasse"][: (-1 * len(house_number))]
if "Hausnummer" in base:
house_number = house_number + base["Hausnummer"]
else:
if "Hausnummer" in base:
house_number = base["Hausnummer"]
street = base["Strasse"]
return Location(
**{
"city": base["Ort"],
"zip_code": base["Postleitzahl"],
"street": normalize_street(street), # type: ignore
"house_number": house_number,
}
)
def name_from_beteiligung(data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
name = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
return remove_traling_and_leading_quotes(name)
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
try:
return CompanyTypeEnum(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Rechtstraeger"
]["Rechtsform"]["content"]
)
except KeyError:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
if company_name.endswith("SE"):
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
if company_name.endswith("KG"):
return CompanyTypeEnum("Kommanditgesellschaft")
return None
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
# Early return
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
return None
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == CompanyTypeEnum.KG:
capital_type = "Hafteinlage"
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]["Zusatz_KG"]["Daten_Kommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
elif isinstance(base, dict):
capital = base["Hafteinlage"]
elif company_type in [
CompanyTypeEnum.GMBH,
CompanyTypeEnum.SE,
CompanyTypeEnum.AG,
CompanyTypeEnum.KGaA,
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
CompanyTypeEnum.OHG,
]:
if (
"Kapitalgesellschaft"
not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"]
):
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]
else:
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]
if "Zusatz_GmbH" in base:
capital_type = "Stammkapital"
capital = base["Zusatz_GmbH"]["Stammkapital"]
elif "Zusatz_Aktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
CompanyTypeEnum.PARTNERSCHAFT,
CompanyTypeEnum.PARTNERGESELLSCHAFT,
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
None,
]:
return None
# Catch entries having the dict but with null values
if not all(capital.values()):
return None
return Capital(
**{ # type: ignore
"value": float(capital["Zahl"]),
"currency": CurrencyEnum(capital["Waehrung"]),
"type": CapitalTypeEnum(capital_type),
}
)
def map_business_purpose(data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gegenstand_oder_Geschaeftszweck"
]
except KeyError:
return None
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"Gruendungsmetadaten"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"]
):
return extract_date_from_string(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gruendungsmetadaten"
]["Gruendungsdatum"]
)
# No reliable answer
return None
def map_company_id(data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
return CompanyID(
**{
"hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Instanzdaten"
]["Aktenzeichen"],
"district_court": DistrictCourt(
**{
"name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
],
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"],
}
),
}
)
def map_last_update(data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"]
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
result: dict = {"relationships": []}
# TODO Refactor mapping - this is a nightmare...
result["id"] = map_company_id(data)
result["name"] = name_from_beteiligung(data)
result["location"] = loc_from_beteiligung(data)
result["last_update"] = map_last_update(data)
result["company_type"] = map_rechtsform(result["name"], data)
result["capital"] = map_capital(data, result["company_type"])
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
for i in range(
2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"])
):
people = parse_stakeholder(
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
)
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)
if __name__ == "__main__":
from loguru import logger
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
company: Company = map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
logger.error(f"Error in processing {path}")
sys.exit(1)

View File

@ -0,0 +1 @@
"""Transform Unternehmensregister data to Transparenzregister API."""

View File

@ -0,0 +1,256 @@
"""Common functions for data transformation."""
import abc
import re
import typing
from collections.abc import Sequence
from aki_prj23_transparenzregister.models.company import (
Capital,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
Location,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.string_tools import (
transform_date_to_iso,
)
def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any:
"""Traverse a dict using list of keys.
Args:
data (dict): Data export
path (Sequence[str | int | object]): List of keys
Raises:
KeyError: If key not found
Returns:
any: Value at the end of the path
"""
current = data
for key in path:
try:
current = current[key]
except KeyError as e:
raise KeyError(f"Key {key} not found") from e
return current
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data
class BaseTransformer(metaclass=abc.ABCMeta):
"""Generic abstract class for data transformation between Unternehmensregister and Transparenzregister API."""
@abc.abstractmethod
def parse_date_of_birth(self, data: dict) -> str | None:
"""Retreives the date of birth from a stakeholder entry if possible.
Args:
data (dict): Stakeholder data
Returns:
str | None: date of birth or None if not found
"""
@abc.abstractmethod
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
@abc.abstractmethod
def loc_from_beteiligung(self, data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
@abc.abstractmethod
def name_from_beteiligung(self, data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
@abc.abstractmethod
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
@abc.abstractmethod
def map_capital(self, data: dict, company_type: CompanyTypeEnum) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
@abc.abstractmethod
def map_business_purpose(self, data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
@abc.abstractmethod
def map_founding_date(self, data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
@abc.abstractmethod
def map_company_id(self, data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
@abc.abstractmethod
def map_last_update(self, data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
@abc.abstractmethod
def map_unternehmensregister_json(self, data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""

View File

@ -0,0 +1,102 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import dataclasses
import glob
import json
import os
import sys
import xmltodict
from loguru import logger
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import Company
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
BaseTransformer,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import (
v1,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import (
v3,
)
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
"""Convert all xml files in a directory to json files.
Args:
source_dir (str): Directory hosting the xml files
target_dir (str): Target directory to move json files to
"""
for source_path in [
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
]:
target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
)
try:
with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data))
except Exception as e:
logger.error(e)
def determine_version(data: dict) -> BaseTransformer:
"""Determine Unternehmensregister data API version of given entry.
Args:
data (dict): Unternehmensregister data
Raises:
ValueError: If version could not be determined
Returns:
module: Version module
"""
if "XJustiz_Daten" in data:
return v1.V1_Transformer()
if "tns:nachrichtenkopf" in data[list(data.keys())[0]]:
return v3.V3_Transformer()
raise ValueError("Could not determine Unternehmensregister version.")
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
version = determine_version(data)
return version.map_unternehmensregister_json(data)
if __name__ == "__main__":
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
data = json.loads(file_object.read())
transformer: BaseTransformer = determine_version(data)
company: Company = transformer.map_unternehmensregister_json(data)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
logger.error(f"Error in processing {path}")
sys.exit(1)

View File

@ -0,0 +1 @@
"""Module for transforming Unternehmensregister data from v1 to Transparenzregister API data model."""

View File

@ -0,0 +1,458 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import re
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
BaseTransformer,
extract_date_from_string,
map_co_relation,
normalize_street,
)
from aki_prj23_transparenzregister.utils.string_tools import (
remove_traling_and_leading_quotes,
transform_date_to_iso,
)
class V1_Transformer(BaseTransformer): # noqa: N801
"""Transformer for data exports from Unternehmensregister (v1)."""
def parse_date_of_birth(self, data: dict) -> str | None:
"""Retreives the date of birth from a stakeholder entry if possible.
Args:
data (dict): Stakeholder data
Returns:
str | None: date of birth or None if not found
"""
if "Geburt" in (base := data["Beteiligter"]["Natuerliche_Person"]):
base = base["Geburt"]["Geburtsdatum"]
if isinstance(base, str):
return base
return None
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "Natuerliche_Person" in data["Beteiligter"]:
# It's a Company serving as a "Kommanditist" or similar
if (
data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"]
is None
):
return CompanyToCompanyRelationship(
**{ # type: ignore
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
][-1]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
],
list,
)
else data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
]["Ort"]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return PersonToCompanyRelationship(
**{ # type: ignore
"name": PersonName(
**{
"firstname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Vorname"],
"lastname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
}
),
"date_of_birth": self.parse_date_of_birth(data),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
][-1]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
list,
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.PERSON,
}
)
if "Organisation" in data["Beteiligter"]:
return CompanyToCompanyRelationship(
**{ # type: ignore
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Organisation"]["Anschrift"][
"Ort"
],
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
"Strasse"
]
if "Strasse"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"house_number": data["Beteiligter"]["Organisation"][
"Anschrift"
]["Hausnummer"]
if "Hausnummer"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"zip_code": data["Beteiligter"]["Organisation"][
"Anschrift"
]["Postleitzahl"]
if "Postleitzahl"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
}
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return None
def loc_from_beteiligung(self, data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
house_number = None
street = None
if "Strasse" in base:
regex = r".(\d+)$"
hits = re.findall(regex, base["Strasse"])
if len(hits) == 1:
house_number = hits[0]
street = base["Strasse"][: (-1 * len(house_number))]
if "Hausnummer" in base:
house_number = house_number + base["Hausnummer"]
else:
if "Hausnummer" in base:
house_number = base["Hausnummer"]
street = base["Strasse"]
return Location(
**{
"city": base["Ort"],
"zip_code": base["Postleitzahl"],
"street": normalize_street(street), # type: ignore
"house_number": house_number,
}
)
def name_from_beteiligung(self, data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
name = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
return remove_traling_and_leading_quotes(name)
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
try:
return CompanyTypeEnum(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Rechtstraeger"
]["Rechtsform"]["content"]
)
except KeyError:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
if company_name.endswith("SE"):
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
if company_name.endswith("KG"):
return CompanyTypeEnum("Kommanditgesellschaft")
return None
def map_capital(self, data: dict, company_type: CompanyTypeEnum) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
# Early return
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
return None
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == CompanyTypeEnum.KG:
capital_type = "Hafteinlage"
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]["Zusatz_KG"]["Daten_Kommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["Zahl"] = capital["Zahl"] + float(
entry["Hafteinlage"]["Zahl"]
)
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
elif isinstance(base, dict):
capital = base["Hafteinlage"]
elif company_type in [
CompanyTypeEnum.GMBH,
CompanyTypeEnum.SE,
CompanyTypeEnum.AG,
CompanyTypeEnum.KGaA,
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
CompanyTypeEnum.OHG,
]:
if (
"Kapitalgesellschaft"
not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"]
):
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]
else:
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]
if "Zusatz_GmbH" in base:
capital_type = "Stammkapital"
capital = base["Zusatz_GmbH"]["Stammkapital"]
elif "Zusatz_Aktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
CompanyTypeEnum.PARTNERSCHAFT,
CompanyTypeEnum.PARTNERGESELLSCHAFT,
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
None,
]:
return None
# Catch entries having the dict but with null values
if not all(capital.values()):
return None
return Capital(
**{ # type: ignore
"value": float(capital["Zahl"]),
"currency": CurrencyEnum(capital["Waehrung"]),
"type": CapitalTypeEnum(capital_type),
}
)
def map_business_purpose(self, data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gegenstand_oder_Geschaeftszweck"
]
except KeyError:
return None
def map_founding_date(self, data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"Gruendungsmetadaten"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"]
):
return extract_date_from_string(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gruendungsmetadaten"
]["Gruendungsdatum"]
)
# No reliable answer
return None
def map_company_id(self, data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
return CompanyID(
**{
"hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Instanzdaten"
]["Aktenzeichen"],
"district_court": DistrictCourt(
**{
"name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
],
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"],
}
),
}
)
def map_last_update(self, data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"][
"letzte_Eintragung"
]
def map_unternehmensregister_json(self, data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
result: dict = {"relationships": []}
result["id"] = self.map_company_id(data)
result["name"] = self.name_from_beteiligung(data)
result["location"] = self.loc_from_beteiligung(data)
result["last_update"] = self.map_last_update(data)
result["company_type"] = self.map_rechtsform(result["name"], data)
result["capital"] = self.map_capital(data, result["company_type"])
result["business_purpose"] = self.map_business_purpose(data)
result["founding_date"] = self.map_founding_date(data)
for i in range(
2,
len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"]),
):
people = self.parse_stakeholder(
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
)
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)

View File

@ -0,0 +1 @@
"""Transforms data from the Unternehmensregister v3 API to the data model of the Transparenzregister API."""

View File

@ -0,0 +1,60 @@
"""RoleMapper for Unternehmensregister v3 API."""
import os
from pathlib import Path
import xmltodict
from aki_prj23_transparenzregister.models.company import RelationshipRoleEnum
class RoleMapper:
"""RoleMapper for Unternehmensregister v3 API."""
singleton = None
def __init__(self) -> None:
"""Initialize RoleMapper by ingesting XSD schema file."""
# TODO Automated file retrieval
base_path = os.path.dirname(Path(__file__))
path = os.path.join(
base_path, "assets", "xjustiz_0040_cl_rollenbezeichnung_3_3.xsd"
)
with open(path, encoding="utf-8") as file:
content = file.read()
data = xmltodict.parse(content)
mapping = {}
for entry in data["xs:schema"]["xs:simpleType"]["xs:restriction"][
"xs:enumeration"
]:
mapping[entry["@value"]] = entry["xs:annotation"]["xs:appinfo"]["wert"]
self.dictionary = mapping
@staticmethod
def mapper() -> "RoleMapper":
"""Singleton getter for RoleMapper.
Returns:
RoleMapper: Singleton instance
"""
if RoleMapper.singleton is None:
RoleMapper.singleton = RoleMapper()
return RoleMapper.singleton
def get(self, key: str) -> RelationshipRoleEnum:
"""Get mapped value for given key.
Args:
key (str): Key to map
Returns:
RelationshipRoleEnum: Mapped value
"""
return RelationshipRoleEnum(self.dictionary[key])
if __name__ == "__main__":
from loguru import logger
mapper = RoleMapper()
logger.info(f"Mapped value for role 201 - {mapper.get('201')}")

View File

@ -0,0 +1,561 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import re
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.common import (
BaseTransformer,
map_co_relation,
normalize_street,
traversal,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import (
RoleMapper,
)
from aki_prj23_transparenzregister.utils.string_tools import (
remove_traling_and_leading_quotes,
transform_date_to_iso,
)
class V3_Transformer(BaseTransformer): # noqa: N801
"""Transformer for data exports from Unternehmensregister (v3)."""
def parse_date_of_birth(self, data: dict) -> str | None:
"""Retreives the date of birth from a stakeholder entry if possible.
Args:
data (dict): Stakeholder data
Returns:
str | None: date of birth or None if not found
"""
if "tns:geburt" in (
base := data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]
):
base = base["tns:geburt"]["tns:geburtsdatum"]
if isinstance(base, str):
return base
return None
def map_role_id_to_enum(self, role_id: str) -> RelationshipRoleEnum:
"""Map Unternehmensregister role ID to RelationshipRoleEnum.
Args:
role_id (str): Unternehmensregister role ID
Returns:
RelationshipRoleEnum: Role enum
"""
mapper = RoleMapper.mapper()
return mapper.get(role_id)
def parse_stakeholder(self, data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if (
"tns:natuerlichePerson"
in data["tns:beteiligter"]["tns:auswahl_beteiligter"]
):
# It's a Company serving as a "Kommanditist" or similar
if (
"tns:vorname"
not in data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:vollerName"]
):
return CompanyToCompanyRelationship(
**{ # type: ignore
"name": remove_traling_and_leading_quotes(
data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:vollerName"]["tns:nachname"]
),
"location": Location(
**{
"city": data["tns:beteiligter"][
"tns:auswahl_beteiligter"
]["tns:natuerlichePerson"]["tns:anschrift"][-1][
"tns:ort"
]
if isinstance(
data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:anschrift"],
list,
)
else data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:anschrift"]["tns:ort"]
}
),
"role": self.map_role_id_to_enum(
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return PersonToCompanyRelationship(
**{ # type: ignore
"name": PersonName(
**{
"firstname": data["tns:beteiligter"][
"tns:auswahl_beteiligter"
]["tns:natuerlichePerson"]["tns:vollerName"]["tns:vorname"],
"lastname": data["tns:beteiligter"][
"tns:auswahl_beteiligter"
]["tns:natuerlichePerson"]["tns:vollerName"][
"tns:nachname"
],
}
),
"date_of_birth": self.parse_date_of_birth(data),
"location": Location(
**{
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:anschrift"][-1]["tns:ort"]
if isinstance(
data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:anschrift"],
list,
)
else data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:natuerlichePerson"
]["tns:anschrift"]["tns:ort"]
}
),
"role": self.map_role_id_to_enum(
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
),
"type": CompanyRelationshipEnum.PERSON,
}
)
if "tns:organisation" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
base = data["tns:beteiligter"]["tns:auswahl_beteiligter"][
"tns:organisation"
]
location = None
if "tns:anschrift" in base:
location = Location(
**{
"city": base["tns:anschrift"]["tns:ort"],
"street": base["tns:anschrift"]["tns:strasse"]
if "tns:strasse" in base["tns:anschrift"]
else None,
"house_number": base["tns:anschrift"]["tns:hausnummer"]
if "tns:hausnummer" in base["tns:anschrift"]
else None,
"zip_code": base["tns:anschrift"]["tns:postleitzahl"]
if "tns:postleitzahl" in base["tns:anschrift"]
else None,
}
)
else:
location = Location(
**{
"city": base["tns:sitz"]["tns:ort"],
"street": base["tns:sitz"]["tns:strasse"]
if "tns:strasse" in base["tns:sitz"]
else None,
"house_number": base["tns:sitz"]["tns:hausnummer"]
if "tns:hausnummer" in base["tns:sitz"]
else None,
"zip_code": base["tns:sitz"]["tns:postleitzahl"]
if "tns:postleitzahl" in base["tns:sitz"]
else None,
}
)
return CompanyToCompanyRelationship(
**{ # type: ignore
"role": self.map_role_id_to_enum(
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
),
"name": remove_traling_and_leading_quotes(
base["tns:bezeichnung"]["tns:bezeichnung.aktuell"]
),
"location": location,
"type": CompanyRelationshipEnum.COMPANY,
}
)
return None
def loc_from_beteiligung(self, data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
]
base = traversal(data, base_path)
base = base["tns:anschrift"] if "tns:anschrift" in base else base["tns:sitz"]
if isinstance(base, list):
base = base[0]
house_number = None
street = None
if "tns:strasse" in base:
regex = r".(\d+)$"
hits = re.findall(regex, base["tns:strasse"])
if len(hits) == 1:
house_number = hits[0]
street = base["tns:strasse"][: (-1 * len(house_number))]
if "tns:hausnummer" in base:
house_number = house_number + base["tns:hausnummer"]
else:
if "tns:hausnummer" in base:
house_number = base["tns:hausnummer"]
street = base["tns:strasse"]
return Location(
**{
"city": base["tns:ort"],
"zip_code": base["tns:postleitzahl"],
"street": normalize_street(street), # type: ignore
"house_number": house_number,
}
)
def name_from_beteiligung(self, data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
"tns:bezeichnung",
"tns:bezeichnung.aktuell",
]
name = traversal(data, path)
return remove_traling_and_leading_quotes(name)
def map_rechtsform(self, company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
try:
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:rechtstraeger",
"tns:angabenZurRechtsform",
"tns:rechtsform",
"code",
]
return CompanyTypeEnum(traversal(data, path))
except Exception:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
if company_name.endswith("SE"):
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
if company_name.endswith("KG"):
return CompanyTypeEnum("Kommanditgesellschaft")
return None
def map_capital( # noqa: PLR0912
self, data: dict, company_type: CompanyTypeEnum
) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
# Early return
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
return None
capital: dict = {"tns:zahl": 0.0, "tns:waehrung": {"code": None}}
if (
company_type == CompanyTypeEnum.KG
and "tns:personengesellschaft"
in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
):
capital_type = "Hafteinlage"
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]["tns:zusatzKG"]["tns:datenKommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["tns:zahl"] = capital["tns:zahl"] + float(
entry["tns:hafteinlage"]["tns:zahl"]
)
capital["tns:waehrung"]["code"] = entry["tns:hafteinlage"][
"tns:waehrung"
]["code"]
elif isinstance(base, dict):
capital = base["tns:hafteinlage"]
elif company_type in [
CompanyTypeEnum.GMBH,
CompanyTypeEnum.SE,
CompanyTypeEnum.AG,
CompanyTypeEnum.KGaA,
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
CompanyTypeEnum.OHG,
]:
if (
"tns:kapitalgesellschaft"
not in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
):
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]
else:
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:kapitalgesellschaft"
]
if "tns:zusatzGmbH" in base:
capital_type = "Stammkapital"
capital = base["tns:zusatzGmbH"]["tns:stammkapital"]
elif "tns:zusatzAktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["tns:zusatzAktiengesellschaft"]["tns:grundkapital"][
"tns:hoehe"
]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
CompanyTypeEnum.PARTNERSCHAFT,
CompanyTypeEnum.PARTNERGESELLSCHAFT,
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
None,
]:
return None
# Catch entries having the dict but with null values
if isinstance(capital, list):
capital = capital[0]
if not all(capital.values()):
return None
return Capital(
**{ # type: ignore
"value": float(capital["tns:zahl"]),
"currency": CurrencyEnum(capital["tns:waehrung"]["code"]),
"type": CapitalTypeEnum(capital_type),
}
)
def map_business_purpose(self, data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
path = ["tns:fachdatenRegister", "tns:basisdatenRegister", "tns:gegenstand"]
return traversal(data, path)
except KeyError:
return None
def map_founding_date(self, data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"tns:satzungsdatum"
in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]
):
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:satzungsdatum",
]
base = traversal(data, path)
if "tns:aktuellesSatzungsdatum" in base:
return base["tns:aktuellesSatzungsdatum"]
# No reliable answer
return None
def map_hr_number(self, data: dict) -> str:
"""Extract the HR number from a given Unternehmensregister export.
Args:
data (dict): Data export
Raises:
KeyError: If key not found
Returns:
str: HR number
"""
base = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
"tns:aktenzeichen"
]["tns:auswahl_aktenzeichen"]
if "tns:aktenzeichen.strukturiert" in base:
hr_prefix = base["tns:aktenzeichen.strukturiert"]["tns:register"]["code"]
hr_number = base["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
return f"{hr_prefix} {hr_number}"
if "tns:aktenzeichen.freitext" in base:
return base["tns:aktenzeichen.freitext"]
raise KeyError("Could not find HR number")
def map_district_court(self, data: dict) -> DistrictCourt:
"""Extract the district court from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
DistrictCourt: District court
"""
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
1,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
]
path = [*base_path, "tns:bezeichnung", "tns:bezeichnung.aktuell"]
name = traversal(data, path)
path = [*base_path, "tns:anschrift", "tns:ort"]
city = traversal(data, path)
return DistrictCourt(name=name, city=city)
def map_company_id(self, data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
try:
return CompanyID(hr_number=self.map_hr_number(data), district_court=self.map_district_court(data)) # type: ignore
except KeyError:
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"][
"tns:beteiligung"
][0]["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:organisation"][
"tns:registereintragung"
][
"tns:registernummer"
]
district_court = self.map_district_court(data)
return CompanyID(hr_number=hr_number, district_court=district_court)
def map_last_update(self, data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
path = ["tns:fachdatenRegister", "tns:auszug", "tns:letzteEintragung"]
return traversal(data, path)
def map_unternehmensregister_json(self, data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
root_key = list(data.keys())[0]
data = data[root_key]
result: dict = {"relationships": []}
result["id"] = self.map_company_id(data)
result["name"] = self.name_from_beteiligung(data)
result["location"] = self.loc_from_beteiligung(data)
result["last_update"] = self.map_last_update(data)
result["company_type"] = self.map_rechtsform(result["name"], data)
result["capital"] = self.map_capital(data, result["company_type"])
result["business_purpose"] = self.map_business_purpose(data)
result["founding_date"] = self.map_founding_date(data)
for i in range(
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
):
people = self.parse_stakeholder(
data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
)
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)

View File

@ -45,7 +45,6 @@ class CompanyMongoService:
query = {
"id.hr_number": id["hr_number"],
"id.district_court.name": id["district_court"]["name"],
"id.district_court.city": id["district_court"]["city"],
}
with self.lock:
result = list(self.collection.find(query))

View File

@ -0,0 +1,31 @@
"""Testing find_missing_companies.py."""
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.apps import find_missing_companies
def test_import_find_missing_companies() -> None:
assert find_missing_companies
@patch("aki_prj23_transparenzregister.apps.find_missing_companies.MongoConnector")
@patch("aki_prj23_transparenzregister.apps.find_missing_companies.CompanyMongoService")
@patch(
"aki_prj23_transparenzregister.apps.find_missing_companies.load.load_directory_to_mongo"
)
@patch("aki_prj23_transparenzregister.apps.find_missing_companies.connector")
def test_work(
connector_mock: Mock,
load_directory_to_mongo_mock: Mock,
company_mongo_service_mock: Mock,
mongo_connector_mock: Mock,
) -> None:
config_provider_mock = Mock()
config_provider_mock.session.return_value = Mock()
load_directory_to_mongo_mock.return_value = 42
find_missing_companies.work(
"Atos IT-Dienstleistung und Beratung GmbH", config_provider_mock
)
assert True

View File

@ -86,4 +86,4 @@ def test_wait_for_download_condition() -> None:
def test_scrape() -> None:
with TemporaryDirectory(dir="./") as temp_dir:
extract.scrape("GEA Farm Technologies GmbH", [temp_dir])
extract.scrape("GEA Farm Technologies GmbH", temp_dir)

View File

@ -1,4 +1,8 @@
"""Test load utils from Unternehmensregister."""
import json
import tempfile
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
load,
)
@ -6,3 +10,37 @@ from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister im
def test_smoke() -> None:
assert load
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.load.CompanyMongoService"
)
def test_load_directory_to_mongo(mock_company_service: Mock) -> None:
mock_company_service.migration_of_base_data.return_value = None
with tempfile.TemporaryDirectory() as tmp_dir:
with open(f"{tmp_dir}/test.json", "w") as f:
mock_company = {
"id": {
"district_court": {
"name": "Amtsgericht Hamburg",
"city": "Hamburg",
},
"hr_number": "HRB 47899",
},
"location": {
"city": "Hamburg",
"street": "Heußweg",
"house_number": "35",
"zip_code": "20255",
},
"name": "Aurelius Immo GmbH",
"last_update": "2021-07-05",
"relationships": [],
"business_purpose": "Erwerb und Verwaltung von Immobilien; Geschäftsführung von Immobilienfonds und anderen Gesellschaften; Dienstleistungen in diesem Zusammenhang.",
"capital": {"value": 50000, "currency": "DM", "type": "Stammkapital"},
"company_type": "Gesellschaft mit beschränkter Haftung",
"founding_date": "1977-03-03",
}
json.dump(mock_company, f)
result = load.load_directory_to_mongo(tmp_dir, mock_company_service)
assert result == 1

View File

@ -0,0 +1,140 @@
"""Testing data_extraction/unternehmensregister/transform/common.py."""
import pytest
from aki_prj23_transparenzregister.models.company import (
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
Location,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
common,
)
def test_import_common() -> None:
assert common
def test_traversal() -> None:
data = {"a": {"b": {"c": "d"}}}
assert common.traversal(data, ["a", "b", "c"]) == "d"
def test_traversal_raises_key_error() -> None:
data = {"a": {"b": {"c": "d"}}}
with pytest.raises(KeyError):
common.traversal(data, ["a", "b", "d"])
@pytest.mark.parametrize(
("value", "expected_result"),
[
(None, None),
("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"),
("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"),
("Str. des Tests", "Straße des Tests"),
],
)
def test_normalize_street(value: str, expected_result: str) -> None:
result = common.normalize_street(value)
assert result == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
("", None),
("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"),
("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"),
("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"),
("Str. des Tests vom 1999-04-05", "1999-04-05"),
("Once upon a midnight dreary while I pondered weak and weary...", None),
(
"This company was first founded in 2016-06-10 and then again on 1.5.2004",
None,
),
],
)
def test_extract_date_from_string(value: str, expected_result: str) -> None:
result = common.extract_date_from_string(value)
assert result == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
(
{
"location": Location(
"", "c/o Youco24 Business Center, Abc ffda", None, None
),
"relationships": [],
},
{
"location": Location("", "Abc ffda", None, None),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("", "Abc ffda", None, None),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [],
},
),
],
)
def test_map_co_relation(value: dict, expected_result: dict) -> None:
result = common.map_co_relation(value)
assert result == expected_result

View File

@ -0,0 +1,24 @@
"""Testing main.py."""
import json
import os
from tempfile import TemporaryDirectory
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
main,
)
def test_transform_xml_to_json() -> None:
with TemporaryDirectory(dir="./") as temp_source_dir:
with open(os.path.join(temp_source_dir, "test.xml"), "w") as file:
xml_input = """<?xml version="1.0" encoding="UTF-8"?>
<test>
<message>Hello World!</message>
</test>
"""
file.write(xml_input)
with TemporaryDirectory(dir="./") as temp_target_dir:
main.transform_xml_to_json(temp_source_dir, temp_target_dir)
with open(os.path.join(temp_target_dir, "test.json")) as file:
json_output = json.load(file)
assert json_output == {"test": {"message": "Hello World!"}}

View File

@ -0,0 +1,13 @@
"""Test role_mapper.py."""
from aki_prj23_transparenzregister.models.company import RelationshipRoleEnum
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import (
RoleMapper,
)
def test_init() -> None:
assert isinstance(RoleMapper.mapper(), RoleMapper)
def test_map_role() -> None:
assert RoleMapper.mapper().get("285") == RelationshipRoleEnum.PROKURIST

View File

@ -1,11 +1,6 @@
"""Testing utils/data_extraction/unternehmensregister/transform.py."""
import json
import os
from tempfile import TemporaryDirectory
from unittest.mock import Mock, patch
import pytest
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
@ -21,25 +16,11 @@ from aki_prj23_transparenzregister.models.company import (
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
transform,
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1 import (
V1_Transformer,
)
def test_transform_xml_to_json() -> None:
with TemporaryDirectory(dir="./") as temp_source_dir:
with open(os.path.join(temp_source_dir, "test.xml"), "w") as file:
xml_input = """<?xml version="1.0" encoding="UTF-8"?>
<test>
<message>Hello World!</message>
</test>
"""
file.write(xml_input)
with TemporaryDirectory(dir="./") as temp_target_dir:
transform.transform_xml_to_json(temp_source_dir, temp_target_dir)
with open(os.path.join(temp_target_dir, "test.json")) as file:
json_output = json.load(file)
assert json_output == {"test": {"message": "Hello World!"}}
transform = V1_Transformer()
def test_parse_stakeholder_org_hidden_in_person() -> None:
@ -285,20 +266,6 @@ def test_loc_from_beteiligung_combine() -> None:
assert transform.loc_from_beteiligung(data) == expected_result
@pytest.mark.parametrize(
("value", "expected_result"),
[
(None, None),
("Ludwig-Ganghofer-Str.", "Ludwig-Ganghofer-Straße"),
("Ludwig-Ganghofer-Strasse", "Ludwig-Ganghofer-Straße"),
("Str. des Tests", "Straße des Tests"),
],
)
def test_normalize_street(value: str, expected_result: str) -> None:
result = transform.normalize_street(value)
assert result == expected_result
def test_name_from_beteiligung() -> None:
data = {
"XJustiz_Daten": {
@ -601,28 +568,6 @@ def test_map_business_purpose_no_result() -> None:
assert result is None
@pytest.mark.parametrize(
("value", "expected_result"),
[
("", None),
("Tag der ersten Eintragung: 01.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.05.2004", "2004-05-01"),
("Tag der ersten Eintragung: 1.5.2004", "2004-05-01"),
("Tag der ersten Eintragung: 01.5.2004", "2004-05-01"),
("Gesellschaftsvertrag vom 06.04.2016 Hallo Welt", "2016-04-06"),
("Str. des Tests vom 1999-04-05", "1999-04-05"),
("Once upon a midnight dreary while I pondered weak and weary...", None),
(
"This company was first founded in 2016-06-10 and then again on 1.5.2004",
None,
),
],
)
def test_extract_date_from_string(value: str, expected_result: str) -> None:
result = transform.extract_date_from_string(value)
assert result == expected_result
def test_map_founding_date_from_tag_der_ersten_eintragung() -> None:
data = {
"some entry": "Tag der ersten Eintragung: 01.05.2004",
@ -709,112 +654,35 @@ def test_map_last_update() -> None:
assert result == date
@pytest.mark.parametrize(
("value", "expected_result"),
[
(
{
"location": Location(
"", "c/o Youco24 Business Center, Abc ffda", None, None
),
"relationships": [],
},
{
"location": Location("", "Abc ffda", None, None),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("", "Abc ffda", None, None),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "c/o Youco24 Business Center, Abc Str.", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location(
"Iserlohn", "Abc Str., c/o Youco24 Business Center", "42", "58644"
),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [
CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location("Iserlohn", "Abc Str.", "42", "58644"),
CompanyRelationshipEnum.COMPANY,
"Youco24 Business Center",
)
],
},
),
(
{
"location": Location("Iserlohn", "Abc Str., c/o", "42", "58644"),
"relationships": [],
},
{
"location": Location("Iserlohn", "Abc Str.", "42", "58644"),
"relationships": [],
},
),
],
)
def test_map_co_relation(value: dict, expected_result: dict) -> None:
result = transform.map_co_relation(value)
assert result == expected_result
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_co_relation"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.map_co_relation"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_company_id"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_company_id"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.name_from_beteiligung"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.name_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.loc_from_beteiligung"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.loc_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_last_update"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_last_update"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_rechtsform"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_rechtsform"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_capital"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_capital"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_business_purpose"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_business_purpose"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_founding_date"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.map_founding_date"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.parse_stakeholder"
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1.v1.V1_Transformer.parse_stakeholder"
)
def test_map_unternehmensregister_json( # noqa: PLR0913
mock_map_parse_stakeholder: Mock,

View File

@ -0,0 +1,731 @@
"""Testing utils/data_extraction/unternehmensregister/transform.py."""
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3 import (
V3_Transformer,
)
transform = V3_Transformer()
def test_parse_stakeholder_org_hidden_in_person() -> None:
data = {
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:natuerlichePerson": {
"tns:vollerName": {"tns:nachname": '"Some Company KG'},
"tns:anschrift": {"tns:ort": "Area 51"},
}
}
},
"tns:rolle": {"tns:rollenbezeichnung": {"code": "275"}},
}
expected_result = CompanyToCompanyRelationship(
role=RelationshipRoleEnum.KOMMANDITIST, # type: ignore
name="Some Company KG",
type=CompanyRelationshipEnum.COMPANY,
location=Location(**{"city": "Area 51"}),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_person() -> None:
data = {
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:natuerlichePerson": {
"tns:vollerName": {
"tns:vorname": "Stephen",
"tns:nachname": "King",
},
"tns:anschrift": {"tns:ort": "Maine"},
"tns:geburt": {"tns:geburtsdatum": "1947-09-21"},
}
}
},
"tns:rolle": {"tns:rollenbezeichnung": {"code": "269"}},
}
expected_result = PersonToCompanyRelationship(
role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore
date_of_birth="1947-09-21",
name=PersonName(**{"firstname": "Stephen", "lastname": "King"}),
type=CompanyRelationshipEnum.PERSON,
location=Location(**{"city": "Maine"}),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_person_missing_date_of_birth() -> None:
data = {
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:natuerlichePerson": {
"tns:vollerName": {
"tns:vorname": "Stephen",
"tns:nachname": "King",
},
"tns:anschrift": {"tns:ort": "Maine"},
}
}
},
"tns:rolle": {"tns:rollenbezeichnung": {"code": "269"}},
}
expected_result = PersonToCompanyRelationship(
role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore
date_of_birth=None,
name=PersonName(**{"firstname": "Stephen", "lastname": "King"}),
type=CompanyRelationshipEnum.PERSON,
location=Location(**{"city": "Maine"}),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_org() -> None:
data = {
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:bezeichnung": {
"tns:bezeichnung.aktuell": "Transparenzregister kG"
},
"tns:anschrift": {
"tns:ort": "Iserlohn",
"tns:strasse": "Hauptstrasse",
"tns:hausnummer": "42",
"tns:postleitzahl": "58636",
},
}
}
},
"tns:rolle": {"tns:rollenbezeichnung": {"code": "268"}},
}
expected_result = CompanyToCompanyRelationship(
name="Transparenzregister kG",
role=RelationshipRoleEnum.DIREKTOR, # type: ignore
type=CompanyRelationshipEnum.COMPANY,
location=Location(
**{
"city": "Iserlohn",
"zip_code": "58636",
"house_number": "42",
"street": "Hauptstrasse",
}
),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_org_loc_from_sitz() -> None:
data = {
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:bezeichnung": {
"tns:bezeichnung.aktuell": "Transparenzregister kG"
},
"tns:sitz": {
"tns:ort": "Iserlohn",
"tns:strasse": "Hauptstrasse",
"tns:hausnummer": "42",
"tns:postleitzahl": "58636",
},
}
}
},
"tns:rolle": {"tns:rollenbezeichnung": {"code": "268"}},
}
expected_result = CompanyToCompanyRelationship(
name="Transparenzregister kG",
role=RelationshipRoleEnum.DIREKTOR, # type: ignore
type=CompanyRelationshipEnum.COMPANY,
location=Location(
**{
"city": "Iserlohn",
"zip_code": "58636",
"house_number": "42",
"street": "Hauptstrasse",
}
),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_no_result() -> None:
data: dict = {"tns:beteiligter": {"tns:auswahl_beteiligter": {}}} # type: ignore
assert transform.parse_stakeholder(data) is None
def test_loc_from_beteiligung() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:anschrift": {
"tns:strasse": "Gewerbestraße",
"tns:hausnummer": "8",
"tns:postleitzahl": "72535",
"tns:ort": "Heroldstatt",
},
},
}
}
},
]
}
}
}
expected_result = Location(
city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535"
)
assert transform.loc_from_beteiligung(data) == expected_result
def test_loc_from_beteiligung_number_contained_in_street() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:anschrift": {
"tns:strasse": "Gewerbestraße8",
"tns:postleitzahl": "72535",
"tns:ort": "Heroldstatt",
},
},
}
}
},
]
}
}
}
expected_result = Location(
city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535"
)
assert transform.loc_from_beteiligung(data) == expected_result
def test_loc_from_beteiligung_no_result() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:anschrift": {
"tns:postleitzahl": "72535",
"tns:ort": "Heroldstatt",
},
},
}
}
},
]
}
}
}
expected_result = Location(
city="Heroldstatt", house_number=None, street=None, zip_code="72535"
)
assert transform.loc_from_beteiligung(data) == expected_result
def test_loc_from_beteiligung_combine() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:anschrift": {
"tns:postleitzahl": "72535",
"tns:strasse": "Pliangenserstr. 40",
"tns:hausnummer": "a",
"tns:ort": "Heroldstatt",
},
},
}
}
},
]
}
}
}
expected_result = Location(
city="Heroldstatt",
house_number="40a",
street="Pliangenserstraße",
zip_code="72535",
)
assert transform.loc_from_beteiligung(data) == expected_result
def test_name_from_beteiligung() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:bezeichnung": {
"tns:bezeichnung.aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
},
},
}
},
}
]
}
}
}
expected_result = "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
assert transform.name_from_beteiligung(data) == expected_result
def test_name_from_beteiligung_remove_quotes() -> None:
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:beteiligung": [
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:bezeichnung": {
"tns:bezeichnung.aktuell": '"Siemes Verwaltungs-GmbH"'
},
},
}
},
}
]
}
}
}
expected_result = "Siemes Verwaltungs-GmbH"
assert transform.name_from_beteiligung(data) == expected_result
def test_map_rechtsform() -> None:
data = {
"tns:fachdatenRegister": {
"tns:basisdatenRegister": {
"tns:rechtstraeger": {
"tns:angabenZurRechtsform": {
"tns:rechtsform": {
"code": "Gesellschaft mit beschränkter Haftung"
},
}
},
}
}
}
expected_result = CompanyTypeEnum.GMBH
assert transform.map_rechtsform("", data) == expected_result
def test_map_rechtsform_from_name() -> None:
data = [
("GEA Farm Technologies GmbH", "Gesellschaft mit beschränkter Haftung"),
("Atos SE", "Europäische Aktiengesellschaft (SE)"),
("Bilkenroth KG", "Kommanditgesellschaft"),
("jfoiahfo8sah 98548902 öhz ö", None),
]
for company_name, expected_result in data:
assert transform.map_rechtsform(company_name, {}) == expected_result
def test_map_capital_kg_single() -> None:
capital = Capital(
currency=CurrencyEnum.EURO, value=69000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:personengesellschaft": {
"tns:zusatzKG": {
"tns:datenKommanditist": {
"tns:hafteinlage": {
"tns:zahl": str(capital.value),
"tns:waehrung": {"code": capital.currency},
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result == capital
def test_map_capital_kg_sum() -> None:
capital = Capital(
currency=CurrencyEnum.EURO, value=20000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:personengesellschaft": {
"tns:zusatzKG": {
"tns:datenKommanditist": [
{
"tns:hafteinlage": {
"tns:zahl": str(10000),
"tns:waehrung": {"code": capital.currency},
}
},
{
"tns:hafteinlage": {
"tns:zahl": str(10000),
"tns:waehrung": {"code": capital.currency},
},
},
]
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result == capital
def test_map_capital_no_fachdaten() -> None:
data: dict = {"tns:fachdatenRegister": {}}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result is None
def test_map_capital_gmbh() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:kapitalgesellschaft": {
"tns:zusatzGmbH": {
"tns:stammkapital": {
"tns:zahl": str(capital.value),
"tns:waehrung": {"code": capital.currency},
},
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.GMBH) # type: ignore
assert result == capital
def test_map_capital_ag() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.GRUNDKAPITAL # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:kapitalgesellschaft": {
"tns:zusatzAktiengesellschaft": {
"tns:grundkapital": {
"tns:hoehe": {
"tns:zahl": str(capital.value),
"tns:waehrung": {"code": capital.currency},
}
},
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.SE) # type: ignore
assert result == capital
def test_map_capital_personengesellschaft() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:personengesellschaft": {
"tns:zusatzGmbH": {
"tns:stammkapital": {
"tns:zahl": str(capital.value),
"tns:waehrung": {"code": capital.currency},
},
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
assert result == capital
def test_map_capital_einzelkaufmann() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"Personengesellschaft": {
"tns:zusatzGmbH": {
"tns:stammkapital": {
"tns:zahl": str(capital.value),
"tns:waehrung": {"code": capital.currency},
},
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.EINZELKAUFMANN) # type: ignore
assert result is None
def test_map_capital_partial_null_values() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"tns:fachdatenRegister": {
"tns:auswahl_zusatzangaben": {
"tns:personengesellschaft": {
"tns:zusatzGmbH": {
"tns:stammkapital": {
"tns:zahl": None,
"tns:waehrung": {"code": capital.currency},
},
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
assert result is None
def test_map_business_purpose() -> None:
business_purpose = "Handel mit Betäubungsmitteln aller Art"
data = {
"tns:fachdatenRegister": {
"tns:basisdatenRegister": {"tns:gegenstand": business_purpose}
}
}
result = transform.map_business_purpose(data)
assert result == business_purpose
def test_map_business_purpose_no_result() -> None:
data: dict = {}
result = transform.map_business_purpose(data)
assert result is None
def test_map_founding_date_from_tag_der_ersten_eintragung() -> None:
data = {
"some entry": "Tag der ersten Eintragung: 01.05.2004",
"some other entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
}
expected_result = "2004-05-01"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_from_gesellschaftsvertrag() -> None:
data = {
"some entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
"some other entry": "Das Wesen der Rekursion ist der Selbstaufruf Gesellschaftsvertrag vom 22.12.1996 Hallo Welt",
}
expected_result = "1996-12-22"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_from_gruendungsdatum() -> None:
data = {
"tns:fachdatenRegister": {
"tns:basisdatenRegister": {
"tns:satzungsdatum": {"tns:aktuellesSatzungsdatum": "1998-01-01"}
}
}
}
expected_result = "1998-01-01"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_no_result() -> None:
data: dict = {"tns:fachdatenRegister": {"tns:basisdatenRegister": {}}}
result = transform.map_founding_date(data)
assert result is None
def test_map_company_id() -> None:
district_court = DistrictCourt("Amtsgericht Ulm", "Ulm")
company_id = CompanyID(district_court, "HRA 4711")
data = {
"tns:grunddaten": {
"tns:verfahrensdaten": {
"tns:instanzdaten": {
"tns:aktenzeichen": {
"tns:auswahl_aktenzeichen": {
"tns:aktenzeichen.freitext": company_id.hr_number
}
},
},
"tns:beteiligung": [
{},
{
"tns:beteiligter": {
"tns:auswahl_beteiligter": {
"tns:organisation": {
"tns:bezeichnung": {
"tns:bezeichnung.aktuell": district_court.name
},
"tns:anschrift": {
"tns:ort": district_court.city,
},
}
}
},
},
],
},
},
}
result = transform.map_company_id(data)
assert result == company_id
def test_map_last_update() -> None:
date = "2024-01-01"
data = {"tns:fachdatenRegister": {"tns:auszug": {"tns:letzteEintragung": date}}}
result = transform.map_last_update(data)
assert result == date
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.map_co_relation"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_company_id"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.name_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.loc_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_last_update"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_rechtsform"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_capital"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_business_purpose"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.map_founding_date"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.v3.V3_Transformer.parse_stakeholder"
)
def test_map_unternehmensregister_json( # noqa: PLR0913
mock_map_parse_stakeholder: Mock,
mock_map_founding_date: Mock,
mock_map_business_purpose: Mock,
mock_map_capital: Mock,
mock_map_rechtsform: Mock,
mock_map_last_update: Mock,
mock_loc_from_beteiligung: Mock,
mock_map_name_from_beteiligung: Mock,
mock_map_company_id: Mock,
mock_map_co_relation: Mock,
) -> None:
expected_result = Company(
**{ # type: ignore
"id": Mock(),
"name": Mock(),
"location": Mock(),
"last_update": Mock(),
"company_type": Mock(),
"capital": Mock(),
"business_purpose": Mock(),
"founding_date": Mock(),
"relationships": [Mock()],
}
)
mock_map_company_id.return_value = expected_result.id
mock_map_name_from_beteiligung.return_value = expected_result.name
mock_loc_from_beteiligung.return_value = expected_result.location
mock_map_last_update.return_value = expected_result.last_update
mock_map_rechtsform.return_value = expected_result.company_type
mock_map_capital.return_value = expected_result.capital
mock_map_business_purpose.return_value = expected_result.business_purpose
mock_map_founding_date.return_value = expected_result.founding_date
mock_map_parse_stakeholder.return_value = expected_result.relationships[0]
mock_map_co_relation.side_effect = lambda x: x
data: dict = {
"rootLayerWithSomeStuipStringNooneCaresAbout": {
"tns:grunddaten": {"tns:verfahrensdaten": {"tns:beteiligung": [{}, {}, {}]}}
}
}
result = transform.map_unternehmensregister_json(data)
assert result == expected_result