checkpoint: Manual role mapping via ID

This commit is contained in:
TrisNol 2023-11-02 16:12:23 +01:00
parent 2d9e3f19f9
commit b7f977138d
4 changed files with 244 additions and 811 deletions

View File

@ -4,6 +4,7 @@ import json
import glob
import argparse
import tempfile
import dataclasses
import pandas as pd
from tqdm import tqdm
from pathlib import Path
@ -43,17 +44,14 @@ if __name__ == "__main__":
configer_logger(namespace=parsed)
config = parsed.config
session = connector.get_session(get_config_provider(config))
# missing_companies = session.query(entities.MissingCompany).all()
missing_companies = ["GEA Farm Technologies"]
missing_companies = session.query(entities.MissingCompany).all()
counter = 0
# Scrape data from unternehmensregister
for company in missing_companies:
print(company)
extract.scrape(company, ["tmp", "xml"])
counter = counter + 1
if counter == 5:
break
# # Scrape data from unternehmensregister
# for company in missing_companies:
# print(company.name)
# extract.scrape(company.name, ["tmp", "xml"])
# Transform input
output_path = os.path.join(str(Path.cwd()), *["tmp", "transformed"])
xml_dir = os.path.join(str(Path.cwd()), *["tmp", "xml"])
@ -66,7 +64,7 @@ if __name__ == "__main__":
path = os.path.join(json_dir, file)
with open(path, encoding="utf-8") as file_object:
try:
company: Company = transform.map_unternehmensregister_json(
company = transform.map_unternehmensregister_json(
json.loads(file_object.read())
)

View File

@ -63,12 +63,23 @@ def parse_date_of_birth(data: dict) -> str | None:
Returns:
str | None: date of birth or None if not found
"""
if "Geburt" in (base := data["Beteiligter"]["Natuerliche_Person"]):
base = base["Geburt"]["Geburtsdatum"]
if "tns:geburt" in (base := data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]):
base = base["tns:geburt"]["tns:geburtsdatum"]
if isinstance(base, str):
return base
return None
def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
match role_id:
case "086":
return RelationshipRoleEnum.GESCHAEFTSFUEHRER
case "285":
return RelationshipRoleEnum.PROKURIST
case "194":
return RelationshipRoleEnum.VORSTAND
case _:
raise KeyError(f'Uknown role_id: {role_id}')
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
@ -79,64 +90,65 @@ def parse_stakeholder(data: dict) -> CompanyRelationship | None:
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "Natuerliche_Person" in data["Beteiligter"]:
if "tns:natuerlichePerson" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
# It's a Company serving as a "Kommanditist" or similar
if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
return CompanyToCompanyRelationship(
**{ # type: ignore
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
][-1]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
list,
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
# if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
# return CompanyToCompanyRelationship(
# **{ # type: ignore
# "name": remove_traling_and_leading_quotes(
# data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
# "Nachname"
# ]
# ),
# "location": Location(
# **{
# "city": data["Beteiligter"]["Natuerliche_Person"][
# "Anschrift"
# ][-1]["Ort"]
# if isinstance(
# data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
# list,
# )
# else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
# "Ort"
# ]
# }
# ),
# "role": RelationshipRoleEnum(
# data["Rolle"]["Rollenbezeichnung"]["content"]
# ),
# "type": CompanyRelationshipEnum.COMPANY,
# }
# )
return PersonToCompanyRelationship(
**{ # type: ignore
"name": PersonName(
**{
"firstname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Vorname"],
"lastname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
"firstname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
"tns:vollerName"
]["tns:vorname"],
"lastname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
"tns:vollerName"
]["tns:nachname"],
}
),
"date_of_birth": parse_date_of_birth(data),
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
-1
]["Ort"]
]["tns:ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"], list
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
else data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
"tns:ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
# TODO get role via ID
"role": map_role_id_to_enum(
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
),
"type": CompanyRelationshipEnum.PERSON,
}
@ -207,28 +219,36 @@ def loc_from_beteiligung(data: dict) -> Location:
Returns:
Location: location
"""
base = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
"tns:anschrift"
]
base = traversal(data, base_path)
house_number = None
street = None
if "Strasse" in base:
if "tns:strasse" in base:
regex = r".(\d+)$"
hits = re.findall(regex, base["Strasse"])
hits = re.findall(regex, base["tns:strasse"])
if len(hits) == 1:
house_number = hits[0]
street = base["Strasse"][: (-1 * len(house_number))]
if "Hausnummer" in base:
house_number = house_number + base["Hausnummer"]
street = base["tns:strasse"][: (-1 * len(house_number))]
if "tns:hausnummer" in base:
house_number = house_number + base["tns:hausnummer"]
else:
if "Hausnummer" in base:
house_number = base["Hausnummer"]
street = base["Strasse"]
if "tns:hausnummer" in base:
house_number = base["tns:hausnummer"]
street = base["tns:strasse"]
return Location(
**{
"city": base["Ort"],
"zip_code": base["Postleitzahl"],
"city": base["tns:ort"],
"zip_code": base["tns:postleitzahl"],
"street": normalize_street(street), # type: ignore
"house_number": house_number,
}
@ -244,9 +264,18 @@ def name_from_beteiligung(data: dict) -> str:
Returns:
str: Company name
"""
name = data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
"tns:bezeichnung",
"tns:bezeichnung.aktuell"
]
name = traversal(data, path)
return remove_traling_and_leading_quotes(name)
@ -261,12 +290,18 @@ def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
CompanyTypeEnum | None: Company type if found
"""
try:
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:rechtstraeger",
"tns:angabenZurRechtsform",
"tns:rechtsform",
"code"
]
return CompanyTypeEnum(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Rechtstraeger"
]["Rechtsform"]["content"]
traversal(data, path)
)
except KeyError:
except Exception:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
@ -291,14 +326,14 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
Capital | None: Company Capital if found
"""
# Early return
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
return None
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == CompanyTypeEnum.KG:
capital_type = "Hafteinlage"
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]["Zusatz_KG"]["Daten_Kommanditist"]
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]["tns:zusatzKG"]["tns:datenKommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
@ -315,22 +350,22 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
CompanyTypeEnum.OHG,
]:
if (
"Kapitalgesellschaft"
not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"]
"tns:kapitalgesellschaft"
not in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
):
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]
else:
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:kapitalgesellschaft"
]
if "Zusatz_GmbH" in base:
if "tns:zusatzGmbH" in base:
capital_type = "Stammkapital"
capital = base["Zusatz_GmbH"]["Stammkapital"]
elif "Zusatz_Aktiengesellschaft" in base:
capital = base["tns:zusatzGmbH"]["tns:stammkapital"]
elif "tns:zusatzAktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"]
capital = base["tns:zusatzAktiengesellschaft"]["tns:grundkapital"]["tns:hoehe"]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
@ -345,8 +380,8 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
return None
return Capital(
**{ # type: ignore
"value": float(capital["Zahl"]),
"currency": CurrencyEnum(capital["Waehrung"]),
"value": float(capital["tns:zahl"]),
"currency": CurrencyEnum(capital["tns:waehrung"]["code"]),
"type": CapitalTypeEnum(capital_type),
}
)
@ -362,9 +397,12 @@ def map_business_purpose(data: dict) -> str | None:
str | None: Business purpose if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gegenstand_oder_Geschaeftszweck"
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:gegenstand"
]
return traversal(data, path)
except KeyError:
return None
@ -418,17 +456,65 @@ def map_founding_date(data: dict) -> str | None:
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"Gruendungsmetadaten"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"]
"tns:satzungsdatum"
in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]
):
return extract_date_from_string(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gruendungsmetadaten"
]["Gruendungsdatum"]
)
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:satzungsdatum",
"tns:aktuellesSatzungsdatum"
]
return traversal(data, path)
# No reliable answer
return None
def traversal(data: dict, path: list[str | int]) -> any:
current = data
for key in path:
try:
current = current[key]
except:
raise KeyError(f"Key {key} not found")
return current
def map_hr_number(data: dict) -> str:
base = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
"tns:aktenzeichen"
]["tns:auswahl_aktenzeichen"]
if "tns:aktenzeichen.strukturiert" in base:
hr_prefix = base["tns:aktenzeichen.strukturiert"]["tns:register"][
"code"
]
hr_number = base["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
return f"{hr_prefix} {hr_number}"
elif "tns:aktenzeichen.freitext" in base:
return base["tns:aktenzeichen.freitext"]
return hr_full
def map_district_court(data: dict) -> DistrictCourt:
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
1,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation"
]
path = [*base_path,
"tns:bezeichnung",
"tns:bezeichnung.aktuell"
]
name = traversal(data, path)
path = [*base_path,
"tns:anschrift",
"tns:ort"
]
city = traversal(data, path)
return DistrictCourt(name=name, city=city)
def map_company_id(data: dict) -> CompanyID:
"""Retrieve Company ID from export.
@ -441,37 +527,8 @@ def map_company_id(data: dict) -> CompanyID:
"""
return CompanyID(
**{
"hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Instanzdaten"
]["Aktenzeichen"],
"district_court": DistrictCourt(
**{
"name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
],
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"],
}
),
"hr_number": map_hr_number(data),
"district_court": map_district_court(data)
}
)
@ -485,7 +542,12 @@ def map_last_update(data: dict) -> str:
Returns:
str: Last update date
"""
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"]
path = [
"tns:fachdatenRegister",
"tns:auszug",
"tns:letzteEintragung"
]
return traversal(data, path)
def map_co_relation(data: dict) -> dict:
@ -539,9 +601,10 @@ def map_unternehmensregister_json(data: dict) -> Company:
Returns:
Company: Transformed data
"""
root_key = list(data.keys())[0]
data = data[root_key]
result: dict = {"relationships": []}
# TODO Refactor mapping - this is a nightmare...
result["id"] = map_company_id(data)
result["name"] = name_from_beteiligung(data)
@ -553,11 +616,12 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
# TODO adapt...
for i in range(
2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"])
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
):
people = parse_stakeholder(
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
)
result["relationships"].append(people)
result = map_co_relation(result)

View File

@ -1,652 +0,0 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import dataclasses
import glob
import json
import os
import re
import sys
import xmltodict
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.string_tools import (
remove_traling_and_leading_quotes,
transform_date_to_iso,
)
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
"""Convert all xml files in a directory to json files.
Args:
source_dir (str): Directory hosting the xml files
target_dir (str): Target directory to move json files to
"""
if not os.path.exists(target_dir):
os.makedirs(target_dir)
for source_path in [
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
]:
target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
)
with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data))
def parse_date_of_birth(data: dict) -> str | None:
"""Retreives the date of birth from a stakeholder entry if possible.
Args:
data (dict): Stakeholder data
Returns:
str | None: date of birth or None if not found
"""
if "tns:geburt" in (base := data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]):
base = base["tns:geburt"]["tns:geburtsdatum"]
if isinstance(base, str):
return base
return None
def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
match role_id:
case "086":
return RelationshipRoleEnum.GESCHAEFTSFUEHRER
case "285":
return RelationshipRoleEnum.PROKURIST
case _:
raise KeyError(f'Uknown role_id: {role_id}')
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "tns:natuerlichePerson" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
# It's a Company serving as a "Kommanditist" or similar
# if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
# return CompanyToCompanyRelationship(
# **{ # type: ignore
# "name": remove_traling_and_leading_quotes(
# data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
# "Nachname"
# ]
# ),
# "location": Location(
# **{
# "city": data["Beteiligter"]["Natuerliche_Person"][
# "Anschrift"
# ][-1]["Ort"]
# if isinstance(
# data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
# list,
# )
# else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
# "Ort"
# ]
# }
# ),
# "role": RelationshipRoleEnum(
# data["Rolle"]["Rollenbezeichnung"]["content"]
# ),
# "type": CompanyRelationshipEnum.COMPANY,
# }
# )
return PersonToCompanyRelationship(
**{ # type: ignore
"name": PersonName(
**{
"firstname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
"tns:vollerName"
]["tns:vorname"],
"lastname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
"tns:vollerName"
]["tns:nachname"],
}
),
"date_of_birth": parse_date_of_birth(data),
"location": Location(
**{
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
-1
]["tns:ort"]
if isinstance(
data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"], list
)
else data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
"tns:ort"
]
}
),
# TODO get role via ID
"role": map_role_id_to_enum(
data["tns:rolle"]["tns:rollenbezeichnung"]["code"]
),
"type": CompanyRelationshipEnum.PERSON,
}
)
if "Organisation" in data["Beteiligter"]:
return CompanyToCompanyRelationship(
**{ # type: ignore
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"name": remove_traling_and_leading_quotes(
data["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
),
"location": Location(
**{
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
"Strasse"
]
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"house_number": data["Beteiligter"]["Organisation"][
"Anschrift"
]["Hausnummer"]
if "Hausnummer"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
"Postleitzahl"
]
if "Postleitzahl"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
}
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return None
def normalize_street(street: str) -> str:
"""Normalize street names by extending them to `Straße` or `straße`.
Args:
street (str): Name of street
Returns:
str: Normalized street name
"""
if street is None:
return None
regex = r"(Str\.|Strasse)"
street = re.sub(regex, "Straße", street)
regex = r"(str\.|strasse)"
street = re.sub(regex, "straße", street)
return street.strip()
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
"tns:anschrift"
]
base = traversal(data, base_path)
house_number = None
street = None
if "tns:strasse" in base:
regex = r".(\d+)$"
hits = re.findall(regex, base["tns:strasse"])
if len(hits) == 1:
house_number = hits[0]
street = base["tns:strasse"][: (-1 * len(house_number))]
if "tns:hausnummer" in base:
house_number = house_number + base["tns:hausnummer"]
else:
if "tns:hausnummer" in base:
house_number = base["tns:hausnummer"]
street = base["tns:strasse"]
return Location(
**{
"city": base["tns:ort"],
"zip_code": base["tns:postleitzahl"],
"street": normalize_street(street), # type: ignore
"house_number": house_number,
}
)
def name_from_beteiligung(data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
0,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation",
"tns:bezeichnung",
"tns:bezeichnung.aktuell"
]
name = traversal(data, path)
return remove_traling_and_leading_quotes(name)
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
try:
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:rechtstraeger",
"tns:angabenZurRechtsform",
"tns:rechtsform",
"code"
]
return CompanyTypeEnum(
traversal(data, path)
)
except Exception:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
if company_name.endswith("SE"):
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
if company_name.endswith("KG"):
return CompanyTypeEnum("Kommanditgesellschaft")
return None
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
# Early return
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
return None
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == CompanyTypeEnum.KG:
capital_type = "Hafteinlage"
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]["tns:zusatzKG"]["tns:datenKommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
elif isinstance(base, dict):
capital = base["Hafteinlage"]
elif company_type in [
CompanyTypeEnum.GMBH,
CompanyTypeEnum.SE,
CompanyTypeEnum.AG,
CompanyTypeEnum.KGaA,
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
CompanyTypeEnum.OHG,
]:
if (
"tns:kapitalgesellschaft"
not in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
):
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:personengesellschaft"
]
else:
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
"tns:kapitalgesellschaft"
]
if "tns:zusatzGmbH" in base:
capital_type = "Stammkapital"
capital = base["tns:zusatzGmbH"]["tns:stammkapital"]
elif "tns:zusatzAktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["tns:zusatzAktiengesellschaft"]["tns:grundkapital"]["tns:zahl"]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
CompanyTypeEnum.PARTNERSCHAFT,
CompanyTypeEnum.PARTNERGESELLSCHAFT,
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
None,
]:
return None
# Catch entries having the dict but with null values
if not all(capital.values()):
return None
return Capital(
**{ # type: ignore
"value": float(capital["tns:zahl"]),
"currency": CurrencyEnum(capital["tns:waehrung"]["code"]),
"type": CapitalTypeEnum(capital_type),
}
)
def map_business_purpose(data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:gegenstand"
]
return traversal(data, path)
except KeyError:
return None
def extract_date_from_string(value: str) -> str | None:
"""Extract a date in ISO format from the given string if possible.
Args:
value (str): Input text
Returns:
str | None: Date in ISO format, None if not found
"""
date_regex = [ # type: ignore
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
]
results = []
for regex in date_regex:
result = re.findall(regex["regex"], value) # type: ignore
if len(result) == 1:
relevant_data = result[0]
if regex["mapper"] is not None: # type: ignore
results.append(regex["mapper"](relevant_data)) # type: ignore
else:
results.append(relevant_data)
if len(results) != 1:
return None
return results[0]
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"tns:satzungsdatum"
in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]
):
path = [
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:satzungsdatum",
"tns:aktuellesSatzungsdatum"
]
return traversal(data, path)
# No reliable answer
return None
def traversal(data: dict, path: list[str | int]) -> any:
current = data
for key in path:
try:
current = current[key]
except:
raise KeyError(f"Key {key} not found")
return current
def map_hr_number(data: dict) -> str:
hr_prefix = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
"tns:aktenzeichen"
]["tns:auswahl_aktenzeichen"]["tns:aktenzeichen.strukturiert"]["tns:register"][
"code"
]
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
"tns:aktenzeichen"
]["tns:auswahl_aktenzeichen"]["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
hr_full = f"{hr_prefix} {hr_number}"
return hr_full
def map_district_court(data: dict) -> DistrictCourt:
base_path = [
"tns:grunddaten",
"tns:verfahrensdaten",
"tns:beteiligung",
1,
"tns:beteiligter",
"tns:auswahl_beteiligter",
"tns:organisation"
]
path = [*base_path,
"tns:bezeichnung",
"tns:bezeichnung.aktuell"
]
name = traversal(data, path)
path = [*base_path,
"tns:sitz",
"tns:ort"
]
city = traversal(data, path)
return DistrictCourt(name=name, city=city)
def map_company_id(data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
return CompanyID(
**{
"hr_number": map_hr_number(data),
"district_court": map_district_court(data)
}
)
def map_last_update(data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
path = [
"tns:fachdatenRegister",
"tns:auszug",
"tns:letzteEintragung"
]
return traversal(data, path)
def map_co_relation(data: dict) -> dict:
"""Search for and map the c/o relation from location.street if possible.
Args:
data (dict): Company dict
Returns:
dict: Modified Company dict
"""
street = data["location"].street
if street is None:
return data
parts = street.split(",")
co_company = None
co_company_index = None
for index, part in enumerate(parts):
trimmed_part = part.strip()
result = re.findall(r"^c\/o(.*)$", trimmed_part)
if len(result) == 1:
co_company = result[0].strip()
co_company_index = index
if co_company_index is not None:
del parts[co_company_index]
street = "".join(parts).strip()
data["location"].street = street
if co_company is not None and co_company != "":
relation = CompanyToCompanyRelationship(
RelationshipRoleEnum.CARE_OF, # type: ignore
Location(
data["location"].city,
street,
data["location"].house_number,
data["location"].zip_code,
),
CompanyRelationshipEnum.COMPANY, # type: ignore
co_company,
)
data["relationships"].append(relation)
return data
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
root_key = list(data.keys())[0]
data = data[root_key]
result: dict = {"relationships": []}
result["id"] = map_company_id(data)
result["name"] = name_from_beteiligung(data)
result["location"] = loc_from_beteiligung(data)
result["last_update"] = map_last_update(data)
result["company_type"] = map_rechtsform(result["name"], data)
result["capital"] = map_capital(data, result["company_type"])
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
# TODO adapt...
for i in range(
2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
):
people = parse_stakeholder(
data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
)
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)
if __name__ == "__main__":
from loguru import logger
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
company: Company = map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
logger.error(f"Error in processing {path}")
sys.exit(1)

File diff suppressed because one or more lines are too long