checkpoint(data-ingestion): Move Unternehmensregister code to .py

This commit is contained in:
TrisNol 2023-09-15 17:22:54 +02:00
parent 8be192e1de
commit bfe50ac76d
5 changed files with 563 additions and 13 deletions

View File

@ -3970,10 +3970,6 @@
"import re\n",
"from aki_prj23_transparenzregister.models.company import Company\n",
"\n",
"content = {\n",
" \"type\": \"Person | Company\",\n",
"}\n",
"\n",
"\n",
"def parse_stakeholder(data: dict) -> list:\n",
" if \"Natuerliche_Person\" in data[\"Beteiligter\"]:\n",
@ -4427,6 +4423,36 @@
"service = CompanyMongoService(connector)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['Die Gesellschaft hat am 31.03.2022 mit der BayWa Aktiengesellschaft mit dem Sitz in München (Amtsgericht München HRB 4921) ']\n",
"['Zwischen der E.ON Kraftwerke GmbH mit dem Sitz in Hannover (Amtsgericht Hannover HRB 58691) ']\n"
]
}
],
"source": [
"import re\n",
"\n",
"texts = [\n",
" \"\"\"\n",
"Die Gesellschaft hat am 31.03.2022 mit der BayWa Aktiengesellschaft mit dem Sitz in M\\u00fcnchen (Amtsgericht M\\u00fcnchen HRB 4921) als herrschender Gesellschaft einen Gewinnabf\\u00fchrungsvertrag geschlossen. \n",
"Die Gesellschafterversammlung hat mit Beschluss vom 31.03.2022 zugestimmt.\"\n",
"\"\"\",\n",
" \"\"\"Zwischen der E.ON Kraftwerke GmbH mit dem Sitz in Hannover (Amtsgericht Hannover HRB 58691) als herrschender Gesellschaft und der Gesellschaft als beherrschter Gesellschaft ist am 26.10.2004 und 08.11.2004 ein Beherrschungs- und Gewinnabf\\u00fchrungsvertrag abgeschlossen worden. \n",
"Die Gesellschafterversammlung der herrschenden Gesellschaft hat dem Vertrag am 08.11.2004 und die Gesellschafterversammlung der beherrschten Gesellschaft hat dem Vertrag am 08.11.2004 zugestimmt.\"\"\",\n",
"]\n",
"\n",
"for text in texts:\n",
" print(re.findall(r\"(.*)als herrschender Gesellschaft\", text))"
]
},
{
"cell_type": "code",
"execution_count": 22,

View File

@ -1,5 +1,4 @@
"""Company model."""
from abc import ABC
from dataclasses import asdict, dataclass
from enum import Enum
@ -33,16 +32,43 @@ class Location:
zip_code: str | None = None
@dataclass
class CompanyRelationship(ABC):
"""_summary_.
class CompanyRelationshipEnum(Enum):
"""Type of companyrelations."""
Args:
ABC (_type_): _description_
"""
PERSON = "Person"
COMPANY = "Company"
@dataclass
class CompanyRelationship:
"""Relation of a Company to a person or another company."""
role: RelationshipRoleEnum
location: Location
type: CompanyRelationshipEnum # noqa: A003
@dataclass
class PersonName:
"""Combination of first and lastname as a class."""
firstname: str
lastname: str
@dataclass
class PersonToCompanyRelationship(CompanyRelationship):
"""Extension of CompanyRelationship with extras for Person."""
name: PersonName
date_of_birth: str
@dataclass
class CompanyToCompanyRelationship(CompanyRelationship):
"""Extension of CompanyRelationship with extras for Company."""
description: str
class FinancialKPIEnum(Enum):

View File

@ -14,7 +14,7 @@ from tqdm import tqdm
logger = logging.getLogger()
def scrape(query: str, download_dir: list[str]):
def scrape(query: str, download_dir: list[str]) -> None:
"""Fetch results from Unternehmensregister for given query.
Args:
@ -152,7 +152,7 @@ def get_num_files(path: str, pattern: str = "*.xml") -> int:
return len(glob.glob1(path, pattern))
def rename_latest_file(path: str, filename: str, pattern: str = "*.xml"):
def rename_latest_file(path: str, filename: str, pattern: str = "*.xml") -> None:
"""Rename file in dir with latest change date.
Args:

View File

@ -0,0 +1,34 @@
"""Load processed Unternehmensregister data into MongoDB."""
import glob
import json
import os
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import Company
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService,
)
from aki_prj23_transparenzregister.utils.mongo.connector import (
MongoConnection,
MongoConnector,
)
if __name__ == "__main__":
conn_string = MongoConnection(
hostname="localhost",
database="transparenzregister",
username="username",
password="",
port=27017,
)
connector = MongoConnector(conn_string)
service = CompanyMongoService(connector)
for file in tqdm(glob.glob1("./data/Unternehmensregister/transformed", "*.json")):
path = os.path.join("./data/Unternehmensregister/transformed", file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.insert(company)

View File

@ -0,0 +1,464 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import dataclasses
import glob
import json
import os
import re
from datetime import datetime
import xmltodict
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import (
Capital,
Company,
CompanyRelationship,
CompanyToCompanyRelationship,
Location,
PersonToCompanyRelationship,
)
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
"""Convert all xml files in a directory to json files.
Args:
source_dir (str): Directory hosting the xml files
target_dir (str): Target directory to move json files to
"""
for source_path in [
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
]:
target_path = os.path.join(
target_dir, source_path.split("\\")[-1].replace(".xml", ".json")
)
with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data))
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "Natuerliche_Person" in data["Beteiligter"]:
# It's a Compnay serving as a "Kommanditist" or similar
if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
return CompanyToCompanyRelationship(
**{
"description": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
"location": {
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
-1
]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
},
"role": data["Rolle"]["Rollenbezeichnung"]["content"],
"type": "Company",
}
)
return PersonToCompanyRelationship(
**{
"name": {
"firstname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Vorname"],
"lastname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
},
"date_of_birth": data["Beteiligter"]["Natuerliche_Person"]["Geburt"][
"Geburtsdatum"
]
if "Geburt" in data["Beteiligter"]["Natuerliche_Person"]
else None,
"location": {
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][-1][
"Ort"
]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"]
},
"role": data["Rolle"]["Rollenbezeichnung"]["content"],
"type": "Person",
}
)
if "Organisation" in data["Beteiligter"]:
return CompanyToCompanyRelationship(
**{
"role": data["Rolle"]["Rollenbezeichnung"]["content"],
"description": data["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
],
"location": {
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
"Strasse"
]
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"house_number": data["Beteiligter"]["Organisation"]["Anschrift"][
"Hausnummer"
]
if "Hausnummer" in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
"Postleitzahl"
],
},
"type": "Company",
}
)
return None
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
return Location(
**{
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"zip_code": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Postleitzahl"],
"street": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Strasse"]
if "Strasse"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
else None,
"house_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Hausnummer"]
if "Hausnummer"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
else None,
}
)
def name_from_organisation(data: dict) -> str:
"""Extract the company from the description.
Args:
data (dict): Data export
Returns:
str: Company name
"""
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Organisation"
]["Bezeichnung"]["Bezeichnung_Aktuell"]
def name_from_beteiligung(data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
return data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
def map_rechtsform(company_name: str, data: dict) -> str | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
str | None: Company type if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Rechtstraeger"
]["Rechtsform"]["content"]
except Exception:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return "Gesellschaft mit beschränkter Haftung"
if company_name.endswith("SE"):
return "Europäische Aktiengesellschaft (SE)"
if company_name.endswith("KG"):
return "Kommanditgesellschaft"
return None
def map_capital(data: dict, company_type: str) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (str): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == "Kommanditgesellschaft":
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
return None
capital_type = "Hafteinlage"
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]["Zusatz_KG"]["Daten_Kommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
elif type(base) == "dict":
capital = base["Hafteinlage"]
elif company_type in [
"Gesellschaft mit beschränkter Haftung",
"Europäische Aktiengesellschaft (SE)",
"Aktiengesellschaft",
"Kommanditgesellschaft auf Aktien",
"Rechtsform ausländischen Rechts HRB",
]:
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
return None
if (
"Zusatz_GmbH"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]
):
capital_type = "Stammkapital"
capital = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]["Zusatz_GmbH"]["Stammkapital"]
elif (
"Zusatz_Aktiengesellschaft"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]
):
capital_type = "Grundkapital"
capital = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"]
elif company_type in [
"Einzelkaufmann",
"Einzelkauffrau",
"eingetragene Genossenschaft",
"Partnerschaft",
"Einzelkaufmann / Einzelkauffrau",
"Offene Handelsgesellschaft",
"Partnerschaftsgesellschaft",
None,
]:
return None
else:
return None
return Capital(
**{
"value": capital["Zahl"],
"currency": capital["Waehrung"],
"type": capital_type,
}
)
def map_business_purpose(data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gegenstand_oder_Geschaeftszweck"
]
except Exception:
return None
def transform_date_to_iso(date: str) -> str:
"""Transform a date in `DD.MM.YY(YY)` to `YYYY-MM-DD`.
Args:
date (str): Input date
Returns:
str: ISO date
"""
regex_yy = r"^\d{1,2}\.\d{1,2}\.\d{2}$"
input_format = "%d.%m.%y" if re.match(regex_yy, date) else "%d.%m.%Y"
date_temp = datetime.strptime(date, input_format)
return date_temp.strftime("%Y-%m-%d")
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if "Eintragungstext" in data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"] and (
type(data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["Eintragungstext"])
== "list"
):
temp = data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["Eintragungstext"][
0
]["Text"]
results = re.findall(r"\d{1,2}\.\d{1,2}\.\d{2,4}", temp)
if len(temp) == 1:
return transform_date_to_iso(results[0])
if (
"Gruendungsmetadaten"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"]
):
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gruendungsmetadaten"
]["Gruendungsdatum"]
# No reliable answer
return None
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
result: dict = {"relationships": []}
# TODO Refactor mapping - this is a nightmare...
result["id"] = {
"hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Instanzdaten"
]["Aktenzeichen"],
"district_court": {
"name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][1][
"Beteiligter"
]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][
1
]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Nachname"],
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][1][
"Beteiligter"
]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][
1
]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"],
},
}
result["name"] = name_from_beteiligung(data)
result["location"] = loc_from_beteiligung(data)
result["last_update"] = data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"][
"letzte_Eintragung"
]
# TODO New features --> to be tested
result["company_type"] = map_rechtsform(result["name"], data)
result["capital"] = map_capital(data, result["company_type"])
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
for i in range(
2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"])
):
people = parse_stakeholder(
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
)
result["relationships"].append(people)
return Company(**result)
if __name__ == "__main__":
from loguru import logger
transform_xml_to_json(
"./data/Unternehmensregister/scraping/", "./data/Unternehmensregister/export/"
)
for file in tqdm(glob.glob1("./data/Unternehmensregister/export", "*.json")):
path = os.path.join("./data/Unternehmensregister/export", file)
with open(path, encoding="utf-8") as file_object:
try:
data = json.loads(file_object.read())
company: Company = map_unternehmensregister_json(data)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"./data/Unternehmensregister/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(f"Error in processing {path}\n{e}")
break