diff --git a/src/aki_prj23_transparenzregister/models/company.py b/src/aki_prj23_transparenzregister/models/company.py index 3ad0591..94129f4 100644 --- a/src/aki_prj23_transparenzregister/models/company.py +++ b/src/aki_prj23_transparenzregister/models/company.py @@ -124,7 +124,7 @@ class PersonToCompanyRelationship(CompanyRelationship): """Extension of CompanyRelationship with extras for Person.""" name: PersonName - date_of_birth: str + date_of_birth: str | None @dataclass diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py index 2a34b0d..5b78278 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform.py @@ -1,508 +1,520 @@ -"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading.""" -import dataclasses -import glob -import json -import os -import re -import sys - -import xmltodict -from tqdm import tqdm - -from aki_prj23_transparenzregister.models.company import ( - Capital, - CapitalTypeEnum, - Company, - CompanyID, - CompanyRelationship, - CompanyRelationshipEnum, - CompanyToCompanyRelationship, - CompanyTypeEnum, - CurrencyEnum, - DistrictCourt, - Location, - PersonName, - PersonToCompanyRelationship, - RelationshipRoleEnum, -) -from aki_prj23_transparenzregister.utils.string_tools import ( - remove_traling_and_leading_quotes, - transform_date_to_iso, -) - - -def transform_xml_to_json(source_dir: str, target_dir: str) -> None: - """Convert all xml files in a directory to json files. - - Args: - source_dir (str): Directory hosting the xml files - target_dir (str): Target directory to move json files to - """ - for source_path in [ - os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True) - ]: - target_path = os.path.join( - target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json") - ) - - with open(source_path, encoding="utf-8") as source_file: - # deepcode ignore HandleUnicode: Weird XML format no other solution - data = xmltodict.parse(source_file.read().encode()) - with open(target_path, "w", encoding="utf-8") as json_file: - json_file.write(json.dumps(data)) - - -def parse_stakeholder(data: dict) -> CompanyRelationship | None: - """Extract the company stakeholder/relation from a single "Beteiligung". - - Args: - data (dict): Data export - - Returns: - CompanyRelationship | None: Relationship if it could be processed - """ - if "Natuerliche_Person" in data["Beteiligter"]: - # It's a Company serving as a "Kommanditist" or similar - if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None: - return CompanyToCompanyRelationship( - **{ # type: ignore - "name": remove_traling_and_leading_quotes( - data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][ - "Nachname" - ] - ), - "location": Location( - **{ - "city": data["Beteiligter"]["Natuerliche_Person"][ - "Anschrift" - ][-1]["Ort"] - if isinstance( - data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], - list, - ) - else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ - "Ort" - ] - } - ), - "role": RelationshipRoleEnum( - data["Rolle"]["Rollenbezeichnung"]["content"] - ), - "type": CompanyRelationshipEnum.COMPANY, - } - ) - return PersonToCompanyRelationship( - **{ - "name": PersonName( - **{ - "firstname": data["Beteiligter"]["Natuerliche_Person"][ - "Voller_Name" - ]["Vorname"], - "lastname": data["Beteiligter"]["Natuerliche_Person"][ - "Voller_Name" - ]["Nachname"], - } - ), - "date_of_birth": data["Beteiligter"]["Natuerliche_Person"]["Geburt"][ - "Geburtsdatum" - ] - if "Geburt" in data["Beteiligter"]["Natuerliche_Person"] - else None, - "location": Location( - **{ - "city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ - -1 - ]["Ort"] - if isinstance( - data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list - ) - else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ - "Ort" - ] - } - ), - "role": RelationshipRoleEnum( - data["Rolle"]["Rollenbezeichnung"]["content"] - ), - "type": CompanyRelationshipEnum.PERSON, - } - ) - if "Organisation" in data["Beteiligter"]: - return CompanyToCompanyRelationship( - **{ # type: ignore - "role": RelationshipRoleEnum( - data["Rolle"]["Rollenbezeichnung"]["content"] - ), - "name": remove_traling_and_leading_quotes( - data["Beteiligter"]["Organisation"]["Bezeichnung"][ - "Bezeichnung_Aktuell" - ] - ), - "location": Location( - **{ - "city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"], - "street": data["Beteiligter"]["Organisation"]["Anschrift"][ - "Strasse" - ] - if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"] - else None, - "house_number": data["Beteiligter"]["Organisation"][ - "Anschrift" - ]["Hausnummer"] - if "Hausnummer" - in data["Beteiligter"]["Organisation"]["Anschrift"] - else None, - "zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][ - "Postleitzahl" - ] - if "Postleitzahl" - in data["Beteiligter"]["Organisation"]["Anschrift"] - else None, - } - ), - "type": CompanyRelationshipEnum.COMPANY, - } - ) - return None - - -def normalize_street(street: str) -> str: - """Normalize street names by extending them to `Straße` or `straße`. - - Args: - street (str): Name of street - - Returns: - str: Normalized street name - """ - if street is None: - return None - regex = r"(Str\.|Strasse)" - street = re.sub(regex, "Straße", street) - regex = r"(str\.|strasse)" - street = re.sub(regex, "straße", street) - return street.strip() - - -def loc_from_beteiligung(data: dict) -> Location: - """Extract the company location from the first relationship in the export. - - Args: - data (dict): Data export - - Returns: - Location: location - """ - base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ - "Beteiligter" - ]["Organisation"]["Anschrift"] - - house_number = None - street = None - if "Strasse" in base: - regex = r".(\d+)$" - hits = re.findall(regex, base["Strasse"]) - if len(hits) == 1: - house_number = hits[0] - street = base["Strasse"][: (-1 * len(house_number))] - if "Hausnummer" in base: - house_number = house_number + base["Hausnummer"] - else: - if "Hausnummer" in base: - house_number = base["Hausnummer"] - street = base["Strasse"] - return Location( - **{ - "city": base["Ort"], - "zip_code": base["Postleitzahl"], - "street": normalize_street(street), # type: ignore - "house_number": house_number, - } - ) - - -def name_from_beteiligung(data: dict) -> str: - """Extract the Company name from an Unternehmensregister export by using the first relationship found. - - Args: - data (dict): Data export - - Returns: - str: Company name - """ - name = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ - "Beteiligter" - ]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"] - return remove_traling_and_leading_quotes(name) - - -def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None: - """Extracts the company type from a given Unternehmensregister export. - - Args: - company_name (str): Name of the company as a fallback solution - data (dict): Data export - - Returns: - CompanyTypeEnum | None: Company type if found - """ - try: - return CompanyTypeEnum( - data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ - "Rechtstraeger" - ]["Rechtsform"]["content"] - ) - except KeyError: - if ( - company_name.endswith("GmbH") - or company_name.endswith("UG") - or company_name.endswith("UG (haftungsbeschränkt)") - ): - return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung") - if company_name.endswith("SE"): - return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)") - if company_name.endswith("KG"): - return CompanyTypeEnum("Kommanditgesellschaft") - return None - - -def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None: - """Extracts the company capital from the given Unternehmensregister export. - - Args: - data (dict): Data export - company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung') - - Returns: - Capital | None: Company Capital if found - """ - # Early return - if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]: - return None - capital: dict = {"Zahl": 0.0, "Waehrung": ""} - if company_type == CompanyTypeEnum.KG: - capital_type = "Hafteinlage" - base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ - "Personengesellschaft" - ]["Zusatz_KG"]["Daten_Kommanditist"] - if isinstance(base, list): - for entry in base: - # TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below - capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"]) - capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"] - elif isinstance(base, dict): - capital = base["Hafteinlage"] - elif company_type in [ - CompanyTypeEnum.GMBH, - CompanyTypeEnum.SE, - CompanyTypeEnum.AG, - CompanyTypeEnum.KGaA, - CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM, - CompanyTypeEnum.OHG, - ]: - if ( - "Kapitalgesellschaft" - not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"] - ): - base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ - "Personengesellschaft" - ] - else: - base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ - "Kapitalgesellschaft" - ] - if "Zusatz_GmbH" in base: - capital_type = "Stammkapital" - capital = base["Zusatz_GmbH"]["Stammkapital"] - elif "Zusatz_Aktiengesellschaft" in base: - capital_type = "Grundkapital" - capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"] - elif company_type in [ - CompanyTypeEnum.EINZELKAUFMANN, - CompanyTypeEnum.EG, - CompanyTypeEnum.PARTNERSCHAFT, - CompanyTypeEnum.PARTNERGESELLSCHAFT, - CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT, - None, - ]: - return None - # Catch entries having the dict but with null values - if not all(capital.values()): - return None - return Capital( - **{ # type: ignore - "value": float(capital["Zahl"]), - "currency": CurrencyEnum(capital["Waehrung"]), - "type": CapitalTypeEnum(capital_type), - } - ) - - -def map_business_purpose(data: dict) -> str | None: - """Extracts the "Geschäftszweck" from a given Unternehmensregister export. - - Args: - data (dict): Data export - - Returns: - str | None: Business purpose if found - """ - try: - return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ - "Gegenstand_oder_Geschaeftszweck" - ] - except KeyError: - return None - - -def map_founding_date(data: dict) -> str | None: - """Extracts the founding date from a given Unternehmensregister export. - - Args: - data (dict): Data export - - Returns: - str | None: Founding date if found - """ - text = str(data) - entry_date = re.findall( - r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text - ) - if len(entry_date) == 1: - return transform_date_to_iso(entry_date[0][1]) - - entry_date = re.findall( - r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text - ) - if len(entry_date) == 1: - return transform_date_to_iso(entry_date[0]) - if ( - "Gruendungsmetadaten" - in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"] - ): - return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ - "Gruendungsmetadaten" - ]["Gruendungsdatum"] - # No reliable answer - return None - - -def map_company_id(data: dict) -> CompanyID: - """Retrieve Company ID from export. - - Args: - data (dict): Data export - - Returns: - CompanyID: ID of the company - """ - return CompanyID( - **{ - "hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Instanzdaten" - ]["Aktenzeichen"], - "district_court": DistrictCourt( - **{ - "name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"]["Organisation"]["Bezeichnung"][ - "Bezeichnung_Aktuell" - ] - if "Organisation" - in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"] - else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][ - "Nachname" - ], - "city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"] - if "Organisation" - in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"] - else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ - "Beteiligung" - ][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"], - } - ), - } - ) - - -def map_last_update(data: dict) -> str: - """Extract last update date from export. - - Args: - data (dict): Unternehmensregister export - - Returns: - str: Last update date - """ - return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"] - - -def map_unternehmensregister_json(data: dict) -> Company: - """Processes the Unternehmensregister structured export to a Company by using several helper methods. - - Args: - data (dict): Data export - - Returns: - Company: Transformed data - """ - result: dict = {"relationships": []} - - # TODO Refactor mapping - this is a nightmare... - result["id"] = map_company_id(data) - result["name"] = name_from_beteiligung(data) - - result["location"] = loc_from_beteiligung(data) - result["last_update"] = map_last_update(data) - - result["company_type"] = map_rechtsform(result["name"], data) - result["capital"] = map_capital(data, result["company_type"]) - result["business_purpose"] = map_business_purpose(data) - result["founding_date"] = map_founding_date(data) - - for i in range( - 2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"]) - ): - people = parse_stakeholder( - data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i] - ) - result["relationships"].append(people) - return Company(**result) - - -if __name__ == "__main__": - from loguru import logger - - # transform_xml_to_json( - # "./data/Unternehmensregister/scraping/", "./data/Unternehmensregister/export/" - # ) - base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" - for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")): - path = os.path.join(f"{base_path}/export", file) - with open(path, encoding="utf-8") as file_object: - try: - company: Company = map_unternehmensregister_json( - json.loads(file_object.read()) - ) - - name = "".join(e for e in company.name if e.isalnum())[:50] - - with open( - f"{base_path}/transformed/{name}.json", - "w+", - encoding="utf-8", - ) as export_file: - json.dump( - dataclasses.asdict(company), export_file, ensure_ascii=False - ) - except Exception: - logger.error(f"Error in processing {path}") - sys.exit(1) +"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading.""" +import dataclasses +import glob +import json +import os +import re +import sys + +import xmltodict +from tqdm import tqdm + +from aki_prj23_transparenzregister.models.company import ( + Capital, + CapitalTypeEnum, + Company, + CompanyID, + CompanyRelationship, + CompanyRelationshipEnum, + CompanyToCompanyRelationship, + CompanyTypeEnum, + CurrencyEnum, + DistrictCourt, + Location, + PersonName, + PersonToCompanyRelationship, + RelationshipRoleEnum, +) +from aki_prj23_transparenzregister.utils.string_tools import ( + remove_traling_and_leading_quotes, + transform_date_to_iso, +) + + +def transform_xml_to_json(source_dir: str, target_dir: str) -> None: + """Convert all xml files in a directory to json files. + + Args: + source_dir (str): Directory hosting the xml files + target_dir (str): Target directory to move json files to + """ + for source_path in [ + os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True) + ]: + target_path = os.path.join( + target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json") + ) + + with open(source_path, encoding="utf-8") as source_file: + # deepcode ignore HandleUnicode: Weird XML format no other solution + data = xmltodict.parse(source_file.read().encode()) + with open(target_path, "w", encoding="utf-8") as json_file: + json_file.write(json.dumps(data)) + + +def parse_date_of_birth(data: dict) -> str | None: + """Retreives the date of birth from a stakeholder entry if possible. + + Args: + data (dict): Stakeholder data + + Returns: + str | None: date of birth or None if not found + """ + if "Geburt" in (base := data["Beteiligter"]["Natuerliche_Person"]): + base = base["Geburt"]["Geburtsdatum"] + if isinstance(base, str): + return base + return None + + +def parse_stakeholder(data: dict) -> CompanyRelationship | None: + """Extract the company stakeholder/relation from a single "Beteiligung". + + Args: + data (dict): Data export + + Returns: + CompanyRelationship | None: Relationship if it could be processed + """ + if "Natuerliche_Person" in data["Beteiligter"]: + # It's a Company serving as a "Kommanditist" or similar + if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None: + return CompanyToCompanyRelationship( + **{ # type: ignore + "name": remove_traling_and_leading_quotes( + data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][ + "Nachname" + ] + ), + "location": Location( + **{ + "city": data["Beteiligter"]["Natuerliche_Person"][ + "Anschrift" + ][-1]["Ort"] + if isinstance( + data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], + list, + ) + else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ + "Ort" + ] + } + ), + "role": RelationshipRoleEnum( + data["Rolle"]["Rollenbezeichnung"]["content"] + ), + "type": CompanyRelationshipEnum.COMPANY, + } + ) + return PersonToCompanyRelationship( + **{ # type: ignore + "name": PersonName( + **{ + "firstname": data["Beteiligter"]["Natuerliche_Person"][ + "Voller_Name" + ]["Vorname"], + "lastname": data["Beteiligter"]["Natuerliche_Person"][ + "Voller_Name" + ]["Nachname"], + } + ), + "date_of_birth": parse_date_of_birth(data), + "location": Location( + **{ + "city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ + -1 + ]["Ort"] + if isinstance( + data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list + ) + else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][ + "Ort" + ] + } + ), + "role": RelationshipRoleEnum( + data["Rolle"]["Rollenbezeichnung"]["content"] + ), + "type": CompanyRelationshipEnum.PERSON, + } + ) + if "Organisation" in data["Beteiligter"]: + return CompanyToCompanyRelationship( + **{ # type: ignore + "role": RelationshipRoleEnum( + data["Rolle"]["Rollenbezeichnung"]["content"] + ), + "name": remove_traling_and_leading_quotes( + data["Beteiligter"]["Organisation"]["Bezeichnung"][ + "Bezeichnung_Aktuell" + ] + ), + "location": Location( + **{ + "city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"], + "street": data["Beteiligter"]["Organisation"]["Anschrift"][ + "Strasse" + ] + if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"] + else None, + "house_number": data["Beteiligter"]["Organisation"][ + "Anschrift" + ]["Hausnummer"] + if "Hausnummer" + in data["Beteiligter"]["Organisation"]["Anschrift"] + else None, + "zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][ + "Postleitzahl" + ] + if "Postleitzahl" + in data["Beteiligter"]["Organisation"]["Anschrift"] + else None, + } + ), + "type": CompanyRelationshipEnum.COMPANY, + } + ) + return None + + +def normalize_street(street: str) -> str: + """Normalize street names by extending them to `Straße` or `straße`. + + Args: + street (str): Name of street + + Returns: + str: Normalized street name + """ + if street is None: + return None + regex = r"(Str\.|Strasse)" + street = re.sub(regex, "Straße", street) + regex = r"(str\.|strasse)" + street = re.sub(regex, "straße", street) + return street.strip() + + +def loc_from_beteiligung(data: dict) -> Location: + """Extract the company location from the first relationship in the export. + + Args: + data (dict): Data export + + Returns: + Location: location + """ + base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ + "Beteiligter" + ]["Organisation"]["Anschrift"] + + house_number = None + street = None + if "Strasse" in base: + regex = r".(\d+)$" + hits = re.findall(regex, base["Strasse"]) + if len(hits) == 1: + house_number = hits[0] + street = base["Strasse"][: (-1 * len(house_number))] + if "Hausnummer" in base: + house_number = house_number + base["Hausnummer"] + else: + if "Hausnummer" in base: + house_number = base["Hausnummer"] + street = base["Strasse"] + return Location( + **{ + "city": base["Ort"], + "zip_code": base["Postleitzahl"], + "street": normalize_street(street), # type: ignore + "house_number": house_number, + } + ) + + +def name_from_beteiligung(data: dict) -> str: + """Extract the Company name from an Unternehmensregister export by using the first relationship found. + + Args: + data (dict): Data export + + Returns: + str: Company name + """ + name = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][ + "Beteiligter" + ]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"] + return remove_traling_and_leading_quotes(name) + + +def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None: + """Extracts the company type from a given Unternehmensregister export. + + Args: + company_name (str): Name of the company as a fallback solution + data (dict): Data export + + Returns: + CompanyTypeEnum | None: Company type if found + """ + try: + return CompanyTypeEnum( + data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ + "Rechtstraeger" + ]["Rechtsform"]["content"] + ) + except KeyError: + if ( + company_name.endswith("GmbH") + or company_name.endswith("UG") + or company_name.endswith("UG (haftungsbeschränkt)") + ): + return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung") + if company_name.endswith("SE"): + return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)") + if company_name.endswith("KG"): + return CompanyTypeEnum("Kommanditgesellschaft") + return None + + +def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None: + """Extracts the company capital from the given Unternehmensregister export. + + Args: + data (dict): Data export + company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung') + + Returns: + Capital | None: Company Capital if found + """ + # Early return + if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]: + return None + capital: dict = {"Zahl": 0.0, "Waehrung": ""} + if company_type == CompanyTypeEnum.KG: + capital_type = "Hafteinlage" + base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ + "Personengesellschaft" + ]["Zusatz_KG"]["Daten_Kommanditist"] + if isinstance(base, list): + for entry in base: + # TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below + capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"]) + capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"] + elif isinstance(base, dict): + capital = base["Hafteinlage"] + elif company_type in [ + CompanyTypeEnum.GMBH, + CompanyTypeEnum.SE, + CompanyTypeEnum.AG, + CompanyTypeEnum.KGaA, + CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM, + CompanyTypeEnum.OHG, + ]: + if ( + "Kapitalgesellschaft" + not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"] + ): + base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ + "Personengesellschaft" + ] + else: + base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][ + "Kapitalgesellschaft" + ] + if "Zusatz_GmbH" in base: + capital_type = "Stammkapital" + capital = base["Zusatz_GmbH"]["Stammkapital"] + elif "Zusatz_Aktiengesellschaft" in base: + capital_type = "Grundkapital" + capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"] + elif company_type in [ + CompanyTypeEnum.EINZELKAUFMANN, + CompanyTypeEnum.EG, + CompanyTypeEnum.PARTNERSCHAFT, + CompanyTypeEnum.PARTNERGESELLSCHAFT, + CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT, + None, + ]: + return None + # Catch entries having the dict but with null values + if not all(capital.values()): + return None + return Capital( + **{ # type: ignore + "value": float(capital["Zahl"]), + "currency": CurrencyEnum(capital["Waehrung"]), + "type": CapitalTypeEnum(capital_type), + } + ) + + +def map_business_purpose(data: dict) -> str | None: + """Extracts the "Geschäftszweck" from a given Unternehmensregister export. + + Args: + data (dict): Data export + + Returns: + str | None: Business purpose if found + """ + try: + return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ + "Gegenstand_oder_Geschaeftszweck" + ] + except KeyError: + return None + + +def map_founding_date(data: dict) -> str | None: + """Extracts the founding date from a given Unternehmensregister export. + + Args: + data (dict): Data export + + Returns: + str | None: Founding date if found + """ + text = str(data) + entry_date = re.findall( + r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text + ) + if len(entry_date) == 1: + return transform_date_to_iso(entry_date[0][1]) + + entry_date = re.findall( + r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text + ) + if len(entry_date) == 1: + return transform_date_to_iso(entry_date[0]) + if ( + "Gruendungsmetadaten" + in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"] + ): + return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][ + "Gruendungsmetadaten" + ]["Gruendungsdatum"] + # No reliable answer + return None + + +def map_company_id(data: dict) -> CompanyID: + """Retrieve Company ID from export. + + Args: + data (dict): Data export + + Returns: + CompanyID: ID of the company + """ + return CompanyID( + **{ + "hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Instanzdaten" + ]["Aktenzeichen"], + "district_court": DistrictCourt( + **{ + "name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"]["Organisation"]["Bezeichnung"][ + "Bezeichnung_Aktuell" + ] + if "Organisation" + in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"] + else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][ + "Nachname" + ], + "city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"] + if "Organisation" + in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"] + else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][ + "Beteiligung" + ][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"], + } + ), + } + ) + + +def map_last_update(data: dict) -> str: + """Extract last update date from export. + + Args: + data (dict): Unternehmensregister export + + Returns: + str: Last update date + """ + return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"] + + +def map_unternehmensregister_json(data: dict) -> Company: + """Processes the Unternehmensregister structured export to a Company by using several helper methods. + + Args: + data (dict): Data export + + Returns: + Company: Transformed data + """ + result: dict = {"relationships": []} + + # TODO Refactor mapping - this is a nightmare... + result["id"] = map_company_id(data) + result["name"] = name_from_beteiligung(data) + + result["location"] = loc_from_beteiligung(data) + result["last_update"] = map_last_update(data) + + result["company_type"] = map_rechtsform(result["name"], data) + result["capital"] = map_capital(data, result["company_type"]) + result["business_purpose"] = map_business_purpose(data) + result["founding_date"] = map_founding_date(data) + + for i in range( + 2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"]) + ): + people = parse_stakeholder( + data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i] + ) + result["relationships"].append(people) + return Company(**result) + + +if __name__ == "__main__": + from loguru import logger + + # transform_xml_to_json( + # "./data/Unternehmensregister/scraping/", "./data/Unternehmensregister/export/" + # ) + base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" + for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")): + path = os.path.join(f"{base_path}/export", file) + with open(path, encoding="utf-8") as file_object: + try: + company: Company = map_unternehmensregister_json( + json.loads(file_object.read()) + ) + + name = "".join(e for e in company.name if e.isalnum())[:50] + + with open( + f"{base_path}/transformed/{name}.json", + "w+", + encoding="utf-8", + ) as export_file: + json.dump( + dataclasses.asdict(company), export_file, ensure_ascii=False + ) + except Exception: + logger.error(f"Error in processing {path}") + sys.exit(1) diff --git a/tests/utils/data_extraction/unternehmensregister/transform_test.py b/tests/utils/data_extraction/unternehmensregister/transform_test.py index fc1f8b0..7d602eb 100644 --- a/tests/utils/data_extraction/unternehmensregister/transform_test.py +++ b/tests/utils/data_extraction/unternehmensregister/transform_test.py @@ -82,6 +82,27 @@ def test_parse_stakeholder_person() -> None: assert transform.parse_stakeholder(data) == expected_result +def test_parse_stakeholder_person_missing_date_of_birth() -> None: + data = { + "Beteiligter": { + "Natuerliche_Person": { + "Voller_Name": {"Vorname": "Stephen", "Nachname": "King"}, + "Anschrift": {"Ort": "Maine"}, + "Geburt": {"Geburtsdatum": {"@xsi:nil": "true"}}, + } + }, + "Rolle": {"Rollenbezeichnung": {"content": "Geschäftsleiter(in)"}}, + } + expected_result = PersonToCompanyRelationship( + role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore + date_of_birth=None, + name=PersonName(**{"firstname": "Stephen", "lastname": "King"}), + type=CompanyRelationshipEnum.PERSON, + location=Location(**{"city": "Maine"}), + ) + assert transform.parse_stakeholder(data) == expected_result + + def test_parse_stakeholder_org() -> None: data = { "Beteiligter": {