mirror of
https://github.com/fhswf/aki_prj23_transparenzregister.git
synced 2025-04-25 22:02:35 +02:00
646 lines
22 KiB
Python
646 lines
22 KiB
Python
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
|
|
import dataclasses
|
|
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import xmltodict
|
|
from tqdm import tqdm
|
|
|
|
from aki_prj23_transparenzregister.models.company import (
|
|
Capital,
|
|
CapitalTypeEnum,
|
|
Company,
|
|
CompanyID,
|
|
CompanyRelationship,
|
|
CompanyRelationshipEnum,
|
|
CompanyToCompanyRelationship,
|
|
CompanyTypeEnum,
|
|
CurrencyEnum,
|
|
DistrictCourt,
|
|
Location,
|
|
PersonName,
|
|
PersonToCompanyRelationship,
|
|
RelationshipRoleEnum,
|
|
)
|
|
from aki_prj23_transparenzregister.utils.string_tools import (
|
|
remove_traling_and_leading_quotes,
|
|
transform_date_to_iso,
|
|
)
|
|
|
|
|
|
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
|
|
"""Convert all xml files in a directory to json files.
|
|
|
|
Args:
|
|
source_dir (str): Directory hosting the xml files
|
|
target_dir (str): Target directory to move json files to
|
|
"""
|
|
if not os.path.exists(target_dir):
|
|
os.makedirs(target_dir)
|
|
for source_path in [
|
|
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
|
|
]:
|
|
target_path = os.path.join(
|
|
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
|
|
)
|
|
|
|
with open(source_path, encoding="utf-8") as source_file:
|
|
# deepcode ignore HandleUnicode: Weird XML format no other solution
|
|
data = xmltodict.parse(source_file.read().encode())
|
|
with open(target_path, "w", encoding="utf-8") as json_file:
|
|
json_file.write(json.dumps(data))
|
|
|
|
|
|
def parse_date_of_birth(data: dict) -> str | None:
|
|
"""Retreives the date of birth from a stakeholder entry if possible.
|
|
|
|
Args:
|
|
data (dict): Stakeholder data
|
|
|
|
Returns:
|
|
str | None: date of birth or None if not found
|
|
"""
|
|
if "tns:geburt" in (base := data["tns:beteiligter"]["tns:auswahl_beteililgter"]["tns:natuerlichePerson"]):
|
|
base = base["tns:geburt"]["tns:geburtsdatum"]
|
|
if isinstance(base, str):
|
|
return base
|
|
return None
|
|
|
|
# def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
|
|
|
|
|
|
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
|
|
"""Extract the company stakeholder/relation from a single "Beteiligung".
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
CompanyRelationship | None: Relationship if it could be processed
|
|
"""
|
|
if "tns:natuerlichePerson" in data["tns:beteiligter"]["tns:auswahl_beteiligter"]:
|
|
# It's a Company serving as a "Kommanditist" or similar
|
|
# if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
|
|
# return CompanyToCompanyRelationship(
|
|
# **{ # type: ignore
|
|
# "name": remove_traling_and_leading_quotes(
|
|
# data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
|
|
# "Nachname"
|
|
# ]
|
|
# ),
|
|
# "location": Location(
|
|
# **{
|
|
# "city": data["Beteiligter"]["Natuerliche_Person"][
|
|
# "Anschrift"
|
|
# ][-1]["Ort"]
|
|
# if isinstance(
|
|
# data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
|
|
# list,
|
|
# )
|
|
# else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
|
|
# "Ort"
|
|
# ]
|
|
# }
|
|
# ),
|
|
# "role": RelationshipRoleEnum(
|
|
# data["Rolle"]["Rollenbezeichnung"]["content"]
|
|
# ),
|
|
# "type": CompanyRelationshipEnum.COMPANY,
|
|
# }
|
|
# )
|
|
return PersonToCompanyRelationship(
|
|
**{ # type: ignore
|
|
"name": PersonName(
|
|
**{
|
|
"firstname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
|
|
"tns:vollerName"
|
|
]["tns:vorname"],
|
|
"lastname": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"][
|
|
"tns:vollerName"
|
|
]["tns:nachname"],
|
|
}
|
|
),
|
|
"date_of_birth": parse_date_of_birth(data),
|
|
"location": Location(
|
|
**{
|
|
"city": data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
|
|
-1
|
|
]["tns:ort"]
|
|
if isinstance(
|
|
data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"], list
|
|
)
|
|
else data["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:natuerlichePerson"]["tns:anschrift"][
|
|
"tns:ort"
|
|
]
|
|
}
|
|
),
|
|
# TODO get role via ID
|
|
"role": RelationshipRoleEnum(
|
|
data["Rolle"]["Rollenbezeichnung"]["content"]
|
|
),
|
|
"type": CompanyRelationshipEnum.PERSON,
|
|
}
|
|
)
|
|
if "Organisation" in data["Beteiligter"]:
|
|
return CompanyToCompanyRelationship(
|
|
**{ # type: ignore
|
|
"role": RelationshipRoleEnum(
|
|
data["Rolle"]["Rollenbezeichnung"]["content"]
|
|
),
|
|
"name": remove_traling_and_leading_quotes(
|
|
data["Beteiligter"]["Organisation"]["Bezeichnung"][
|
|
"Bezeichnung_Aktuell"
|
|
]
|
|
),
|
|
"location": Location(
|
|
**{
|
|
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
|
|
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
|
|
"Strasse"
|
|
]
|
|
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
|
|
else None,
|
|
"house_number": data["Beteiligter"]["Organisation"][
|
|
"Anschrift"
|
|
]["Hausnummer"]
|
|
if "Hausnummer"
|
|
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
|
else None,
|
|
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
|
|
"Postleitzahl"
|
|
]
|
|
if "Postleitzahl"
|
|
in data["Beteiligter"]["Organisation"]["Anschrift"]
|
|
else None,
|
|
}
|
|
),
|
|
"type": CompanyRelationshipEnum.COMPANY,
|
|
}
|
|
)
|
|
return None
|
|
|
|
|
|
def normalize_street(street: str) -> str:
|
|
"""Normalize street names by extending them to `Straße` or `straße`.
|
|
|
|
Args:
|
|
street (str): Name of street
|
|
|
|
Returns:
|
|
str: Normalized street name
|
|
"""
|
|
if street is None:
|
|
return None
|
|
regex = r"(Str\.|Strasse)"
|
|
street = re.sub(regex, "Straße", street)
|
|
regex = r"(str\.|strasse)"
|
|
street = re.sub(regex, "straße", street)
|
|
return street.strip()
|
|
|
|
|
|
def loc_from_beteiligung(data: dict) -> Location:
|
|
"""Extract the company location from the first relationship in the export.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
Location: location
|
|
"""
|
|
base_path = [
|
|
"tns:grunddaten",
|
|
"tns:verfahrensdaten",
|
|
"tns:beteiligung",
|
|
0,
|
|
"tns:beteiligter",
|
|
"tns:auswahl_beteiligter",
|
|
"tns:organisation",
|
|
"tns:anschrift"
|
|
]
|
|
base = traversal(data, base_path)
|
|
|
|
house_number = None
|
|
street = None
|
|
if "tns:strasse" in base:
|
|
regex = r".(\d+)$"
|
|
hits = re.findall(regex, base["tns:strasse"])
|
|
if len(hits) == 1:
|
|
house_number = hits[0]
|
|
street = base["tns:strasse"][: (-1 * len(house_number))]
|
|
if "tns:hausnummer" in base:
|
|
house_number = house_number + base["tns:hausnummer"]
|
|
else:
|
|
if "tns:hausnummer" in base:
|
|
house_number = base["tns:hausnummer"]
|
|
street = base["tns:strasse"]
|
|
return Location(
|
|
**{
|
|
"city": base["tns:ort"],
|
|
"zip_code": base["tns:postleitzahl"],
|
|
"street": normalize_street(street), # type: ignore
|
|
"house_number": house_number,
|
|
}
|
|
)
|
|
|
|
|
|
def name_from_beteiligung(data: dict) -> str:
|
|
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
str: Company name
|
|
"""
|
|
path = [
|
|
"tns:grunddaten",
|
|
"tns:verfahrensdaten",
|
|
"tns:beteiligung",
|
|
0,
|
|
"tns:beteiligter",
|
|
"tns:auswahl_beteiligter",
|
|
"tns:organisation",
|
|
"tns:bezeichnung",
|
|
"tns:bezeichnung.aktuell"
|
|
]
|
|
name = traversal(data, path)
|
|
return remove_traling_and_leading_quotes(name)
|
|
|
|
|
|
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|
"""Extracts the company type from a given Unternehmensregister export.
|
|
|
|
Args:
|
|
company_name (str): Name of the company as a fallback solution
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
CompanyTypeEnum | None: Company type if found
|
|
"""
|
|
try:
|
|
path = [
|
|
"tns:fachdatenRegister",
|
|
"tns:basisdatenRegister",
|
|
"tns:rechtstraeger",
|
|
"tns:angabenZurRechtsform",
|
|
"tns:rechtsform",
|
|
"code"
|
|
]
|
|
return CompanyTypeEnum(
|
|
traversal(data, path)
|
|
)
|
|
except Exception:
|
|
if (
|
|
company_name.endswith("GmbH")
|
|
or company_name.endswith("UG")
|
|
or company_name.endswith("UG (haftungsbeschränkt)")
|
|
):
|
|
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
|
|
if company_name.endswith("SE"):
|
|
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
|
|
if company_name.endswith("KG"):
|
|
return CompanyTypeEnum("Kommanditgesellschaft")
|
|
return None
|
|
|
|
|
|
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
|
"""Extracts the company capital from the given Unternehmensregister export.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
|
|
|
|
Returns:
|
|
Capital | None: Company Capital if found
|
|
"""
|
|
# Early return
|
|
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
|
|
return None
|
|
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
|
|
if company_type == CompanyTypeEnum.KG:
|
|
capital_type = "Hafteinlage"
|
|
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
|
|
"tns:personengesellschaft"
|
|
]["tns:zusatzKG"]["tns:datenKommanditist"]
|
|
if isinstance(base, list):
|
|
for entry in base:
|
|
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
|
|
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
|
|
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
|
|
elif isinstance(base, dict):
|
|
capital = base["Hafteinlage"]
|
|
elif company_type in [
|
|
CompanyTypeEnum.GMBH,
|
|
CompanyTypeEnum.SE,
|
|
CompanyTypeEnum.AG,
|
|
CompanyTypeEnum.KGaA,
|
|
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
|
|
CompanyTypeEnum.OHG,
|
|
]:
|
|
if (
|
|
"tns:kapitalgesellschaft"
|
|
not in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
|
|
):
|
|
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
|
|
"tns:personengesellschaft"
|
|
]
|
|
else:
|
|
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
|
|
"tns:kapitalgesellschaft"
|
|
]
|
|
if "tns:zusatzGmbH" in base:
|
|
capital_type = "Stammkapital"
|
|
capital = base["tns:zusatzGmbH"]["tns:stammkapital"]
|
|
elif "tns:zusatzAktiengesellschaft" in base:
|
|
capital_type = "Grundkapital"
|
|
capital = base["tns:zusatzAktiengesellschaft"]["tns:grundkapital"]["tns:zahl"]
|
|
elif company_type in [
|
|
CompanyTypeEnum.EINZELKAUFMANN,
|
|
CompanyTypeEnum.EG,
|
|
CompanyTypeEnum.PARTNERSCHAFT,
|
|
CompanyTypeEnum.PARTNERGESELLSCHAFT,
|
|
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
|
|
None,
|
|
]:
|
|
return None
|
|
# Catch entries having the dict but with null values
|
|
if not all(capital.values()):
|
|
return None
|
|
return Capital(
|
|
**{ # type: ignore
|
|
"value": float(capital["tns:zahl"]),
|
|
"currency": CurrencyEnum(capital["tns:waehrung"]["code"]),
|
|
"type": CapitalTypeEnum(capital_type),
|
|
}
|
|
)
|
|
|
|
|
|
def map_business_purpose(data: dict) -> str | None:
|
|
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
str | None: Business purpose if found
|
|
"""
|
|
try:
|
|
path = [
|
|
"tns:fachdatenRegister",
|
|
"tns:basisdatenRegister",
|
|
"tns:gegenstand"
|
|
]
|
|
return traversal(data, path)
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
def extract_date_from_string(value: str) -> str | None:
|
|
"""Extract a date in ISO format from the given string if possible.
|
|
|
|
Args:
|
|
value (str): Input text
|
|
|
|
Returns:
|
|
str | None: Date in ISO format, None if not found
|
|
"""
|
|
date_regex = [ # type: ignore
|
|
{"regex": r"\d{1,2}\.\d{1,2}\.\d{4}", "mapper": transform_date_to_iso},
|
|
{"regex": r"\d{4}-\d{1,2}-\d{1,2}", "mapper": None},
|
|
]
|
|
results = []
|
|
for regex in date_regex:
|
|
result = re.findall(regex["regex"], value) # type: ignore
|
|
if len(result) == 1:
|
|
relevant_data = result[0]
|
|
if regex["mapper"] is not None: # type: ignore
|
|
results.append(regex["mapper"](relevant_data)) # type: ignore
|
|
else:
|
|
results.append(relevant_data)
|
|
if len(results) != 1:
|
|
return None
|
|
return results[0]
|
|
|
|
|
|
def map_founding_date(data: dict) -> str | None:
|
|
"""Extracts the founding date from a given Unternehmensregister export.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
str | None: Founding date if found
|
|
"""
|
|
text = str(data)
|
|
entry_date = re.findall(
|
|
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
|
|
)
|
|
if len(entry_date) == 1:
|
|
return transform_date_to_iso(entry_date[0][1])
|
|
|
|
entry_date = re.findall(
|
|
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
|
|
)
|
|
if len(entry_date) == 1:
|
|
return transform_date_to_iso(entry_date[0])
|
|
if (
|
|
"tns:satzungsdatum"
|
|
in data["tns:fachdatenRegister"]["tns:basisdatenRegister"]
|
|
):
|
|
path = [
|
|
"tns:fachdatenRegister",
|
|
"tns:basisdatenRegister",
|
|
"tns:satzungsdatum",
|
|
"tns:aktuellesSatzungsdatum"
|
|
]
|
|
return traversal(data, path)
|
|
# No reliable answer
|
|
return None
|
|
|
|
def traversal(data: dict, path: list[str | int]) -> any:
|
|
current = data
|
|
for key in path:
|
|
try:
|
|
current = current[key]
|
|
except:
|
|
raise KeyError(f"Key {key} not found")
|
|
return current
|
|
|
|
|
|
def map_hr_number(data: dict) -> str:
|
|
hr_prefix = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
|
|
"tns:aktenzeichen"
|
|
]["tns:auswahl_aktenzeichen"]["tns:aktenzeichen.strukturiert"]["tns:register"][
|
|
"code"
|
|
]
|
|
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
|
|
"tns:aktenzeichen"
|
|
]["tns:auswahl_aktenzeichen"]["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
|
|
hr_full = f"{hr_prefix} {hr_number}"
|
|
return hr_full
|
|
|
|
def map_district_court(data: dict) -> DistrictCourt:
|
|
base_path = [
|
|
"tns:grunddaten",
|
|
"tns:verfahrensdaten",
|
|
"tns:beteiligung",
|
|
1,
|
|
"tns:beteiligter",
|
|
"tns:auswahl_beteiligter",
|
|
"tns:organisation"
|
|
]
|
|
path = [*base_path,
|
|
"tns:bezeichnung",
|
|
"tns:bezeichnung.aktuell"
|
|
]
|
|
name = traversal(data, path)
|
|
path = [*base_path,
|
|
"tns:sitz",
|
|
"tns:ort"
|
|
]
|
|
city = traversal(data, path)
|
|
return DistrictCourt(name=name, city=city)
|
|
|
|
|
|
def map_company_id(data: dict) -> CompanyID:
|
|
"""Retrieve Company ID from export.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
CompanyID: ID of the company
|
|
"""
|
|
return CompanyID(
|
|
**{
|
|
"hr_number": map_hr_number(data),
|
|
"district_court": map_district_court(data)
|
|
}
|
|
)
|
|
|
|
|
|
def map_last_update(data: dict) -> str:
|
|
"""Extract last update date from export.
|
|
|
|
Args:
|
|
data (dict): Unternehmensregister export
|
|
|
|
Returns:
|
|
str: Last update date
|
|
"""
|
|
path = [
|
|
"tns:fachdatenRegister",
|
|
"tns:auszug",
|
|
"tns:letzteEintragung"
|
|
]
|
|
return traversal(data, path)
|
|
|
|
|
|
def map_co_relation(data: dict) -> dict:
|
|
"""Search for and map the c/o relation from location.street if possible.
|
|
|
|
Args:
|
|
data (dict): Company dict
|
|
|
|
Returns:
|
|
dict: Modified Company dict
|
|
"""
|
|
street = data["location"].street
|
|
if street is None:
|
|
return data
|
|
parts = street.split(",")
|
|
co_company = None
|
|
co_company_index = None
|
|
for index, part in enumerate(parts):
|
|
trimmed_part = part.strip()
|
|
result = re.findall(r"^c\/o(.*)$", trimmed_part)
|
|
if len(result) == 1:
|
|
co_company = result[0].strip()
|
|
co_company_index = index
|
|
if co_company_index is not None:
|
|
del parts[co_company_index]
|
|
street = "".join(parts).strip()
|
|
data["location"].street = street
|
|
|
|
if co_company is not None and co_company != "":
|
|
relation = CompanyToCompanyRelationship(
|
|
RelationshipRoleEnum.CARE_OF, # type: ignore
|
|
Location(
|
|
data["location"].city,
|
|
street,
|
|
data["location"].house_number,
|
|
data["location"].zip_code,
|
|
),
|
|
CompanyRelationshipEnum.COMPANY, # type: ignore
|
|
co_company,
|
|
)
|
|
data["relationships"].append(relation)
|
|
return data
|
|
|
|
|
|
def map_unternehmensregister_json(data: dict) -> Company:
|
|
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
|
|
|
Args:
|
|
data (dict): Data export
|
|
|
|
Returns:
|
|
Company: Transformed data
|
|
"""
|
|
root_key = list(data.keys())[0]
|
|
data = data[root_key]
|
|
result: dict = {"relationships": []}
|
|
|
|
result["id"] = map_company_id(data)
|
|
result["name"] = name_from_beteiligung(data)
|
|
|
|
result["location"] = loc_from_beteiligung(data)
|
|
result["last_update"] = map_last_update(data)
|
|
|
|
result["company_type"] = map_rechtsform(result["name"], data)
|
|
result["capital"] = map_capital(data, result["company_type"])
|
|
result["business_purpose"] = map_business_purpose(data)
|
|
result["founding_date"] = map_founding_date(data)
|
|
|
|
# TODO adapt...
|
|
# for i in range(
|
|
# 2, len(data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"])
|
|
# ):
|
|
# people = parse_stakeholder(
|
|
# data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][i]
|
|
# )
|
|
# result["relationships"].append(people)
|
|
result = map_co_relation(result)
|
|
return Company(**result)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from loguru import logger
|
|
|
|
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
|
|
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
|
|
path = os.path.join(f"{base_path}/export", file)
|
|
with open(path, encoding="utf-8") as file_object:
|
|
try:
|
|
company: Company = map_unternehmensregister_json(
|
|
json.loads(file_object.read())
|
|
)
|
|
|
|
name = "".join(e for e in company.name if e.isalnum())[:50]
|
|
|
|
with open(
|
|
f"{base_path}/transformed/{name}.json",
|
|
"w+",
|
|
encoding="utf-8",
|
|
) as export_file:
|
|
json.dump(
|
|
dataclasses.asdict(company), export_file, ensure_ascii=False
|
|
)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(f"Error in processing {path}")
|
|
sys.exit(1)
|