Feature/additional stammdaten (#132)

Feature/additional stammdaten
This commit is contained in:
Tristan Nolde 2023-09-24 15:31:17 +02:00 committed by GitHub
commit 5c8d20f4c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1800 additions and 149 deletions

File diff suppressed because one or more lines are too long

View File

@ -487,6 +487,17 @@
"num_files"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import glob\n",
"import xmltodict"
]
},
{
"cell_type": "code",
"execution_count": 3,
@ -3905,11 +3916,6 @@
}
],
"source": [
"import json\n",
"import glob\n",
"import xmltodict\n",
"\n",
"\n",
"def transform_xml_to_json(source_dir: str, target_dir: str):\n",
" for source_path in [\n",
" os.path.normpath(i) for i in glob.glob(source_dir + \"**/*.xml\", recursive=True)\n",
@ -3935,7 +3941,7 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": 4,
"metadata": {},
"outputs": [
{
@ -3957,41 +3963,62 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"from models.Company import Company\n",
"import re\n",
"from aki_prj23_transparenzregister.models.company import Company\n",
"\n",
"\n",
"def parse_stakeholder(data: dict) -> list:\n",
" if \"Natuerliche_Person\" in data[\"Beteiligter\"]:\n",
" return {\n",
" \"name\": {\n",
" \"firstname\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Voller_Name\"][\n",
" \"Vorname\"\n",
" ],\n",
" \"lastname\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Voller_Name\"][\n",
" # It's a Compnay serving as a \"Kommanditist\" or similar\n",
" if data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Voller_Name\"][\"Vorname\"] is None:\n",
" return {\n",
" \"description\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Voller_Name\"][\n",
" \"Nachname\"\n",
" ],\n",
" },\n",
" \"date_of_birth\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Geburt\"][\n",
" \"Geburtsdatum\"\n",
" ]\n",
" if \"Geburt\" in data[\"Beteiligter\"][\"Natuerliche_Person\"]\n",
" else None,\n",
" \"location\": {\n",
" \"city\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][-1][\n",
" \"Ort\"\n",
" \"location\": {\n",
" \"city\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][-1][\n",
" \"Ort\"\n",
" ]\n",
" if type(data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"])\n",
" == list\n",
" else data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][\"Ort\"]\n",
" },\n",
" \"role\": data[\"Rolle\"][\"Rollenbezeichnung\"][\"content\"],\n",
" \"type\": \"Company\",\n",
" }\n",
" else:\n",
" return {\n",
" \"name\": {\n",
" \"firstname\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\n",
" \"Voller_Name\"\n",
" ][\"Vorname\"],\n",
" \"lastname\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\n",
" \"Voller_Name\"\n",
" ][\"Nachname\"],\n",
" },\n",
" \"date_of_birth\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Geburt\"][\n",
" \"Geburtsdatum\"\n",
" ]\n",
" if type(data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"]) == list\n",
" else data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][\"Ort\"]\n",
" },\n",
" \"role\": data[\"Rolle\"][\"Rollenbezeichnung\"][\"content\"],\n",
" }\n",
" if \"Geburt\" in data[\"Beteiligter\"][\"Natuerliche_Person\"]\n",
" else None,\n",
" \"location\": {\n",
" \"city\": data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][-1][\n",
" \"Ort\"\n",
" ]\n",
" if type(data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"])\n",
" == list\n",
" else data[\"Beteiligter\"][\"Natuerliche_Person\"][\"Anschrift\"][\"Ort\"]\n",
" },\n",
" \"role\": data[\"Rolle\"][\"Rollenbezeichnung\"][\"content\"],\n",
" \"type\": \"Person\",\n",
" }\n",
" if \"Organisation\" in data[\"Beteiligter\"]:\n",
" return {\n",
" \"role\": \"Organisation\",\n",
" \"role\": data[\"Rolle\"][\"Rollenbezeichnung\"][\"content\"],\n",
" \"description\": data[\"Beteiligter\"][\"Organisation\"][\"Bezeichnung\"][\n",
" \"Bezeichnung_Aktuell\"\n",
" ],\n",
@ -4009,6 +4036,7 @@
" \"Postleitzahl\"\n",
" ],\n",
" },\n",
" \"type\": \"Company\",\n",
" }\n",
"\n",
"\n",
@ -4111,6 +4139,156 @@
" ][\"Organisation\"][\"Bezeichnung\"][\"Bezeichnung_Aktuell\"]\n",
"\n",
"\n",
"# TODO Not present in all companies - possibly map using name of company ...\n",
"def map_rechtsform(company_name: str, data: dict) -> str:\n",
" try:\n",
" return data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Basisdaten_Register\"][\n",
" \"Rechtstraeger\"\n",
" ][\"Rechtsform\"][\"content\"]\n",
" except:\n",
" if (\n",
" company_name.endswith(\"GmbH\")\n",
" or company_name.endswith(\"UG\")\n",
" or company_name.endswith(\"UG (haftungsbeschränkt)\")\n",
" ):\n",
" return \"Gesellschaft mit beschränkter Haftung\"\n",
" elif company_name.endswith(\"SE\"):\n",
" return \"Europäische Aktiengesellschaft (SE)\"\n",
" elif company_name.endswith(\"KG\"):\n",
" return \"Kommanditgesellschaft\"\n",
" return None\n",
"\n",
"\n",
"def map_stammkapital(data: dict, company_type: str) -> str:\n",
" capital = {\"Zahl\": 0, \"Waehrung\": \"\"}\n",
" if company_type == \"Kommanditgesellschaft\":\n",
" if \"Zusatzangaben\" not in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"]:\n",
" return None\n",
" capital_type = \"Hafteinlage\"\n",
" base = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Zusatzangaben\"][\n",
" \"Personengesellschaft\"\n",
" ][\"Zusatz_KG\"][\"Daten_Kommanditist\"]\n",
" if isinstance(base, list):\n",
" for entry in base:\n",
" # TODO link to persons using Ref_Rollennummer then extract [\"Hafteinlage\"] as below\n",
" capital[\"Zahl\"] = capital[\"Zahl\"] + float(entry[\"Hafteinlage\"][\"Zahl\"])\n",
" # TODO Improve multi assignment\n",
" capital[\"Waehrung\"] = entry[\"Hafteinlage\"][\"Waehrung\"]\n",
" elif type(base) == \"dict\":\n",
" capital = base[\"Hafteinlage\"]\n",
" elif company_type in [\n",
" \"Gesellschaft mit beschränkter Haftung\",\n",
" \"Europäische Aktiengesellschaft (SE)\",\n",
" \"Aktiengesellschaft\",\n",
" \"Kommanditgesellschaft auf Aktien\",\n",
" \"Rechtsform ausländischen Rechts HRB\",\n",
" ]:\n",
" if \"Zusatzangaben\" not in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"]:\n",
" return None\n",
" if (\n",
" \"Zusatz_GmbH\"\n",
" in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Zusatzangaben\"][\n",
" \"Kapitalgesellschaft\"\n",
" ]\n",
" ):\n",
" capital_type = \"Stammkapital\"\n",
" capital = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Zusatzangaben\"][\n",
" \"Kapitalgesellschaft\"\n",
" ][\"Zusatz_GmbH\"][\"Stammkapital\"]\n",
" elif (\n",
" \"Zusatz_Aktiengesellschaft\"\n",
" in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Zusatzangaben\"][\n",
" \"Kapitalgesellschaft\"\n",
" ]\n",
" ):\n",
" capital_type = \"Grundkapital\"\n",
" capital = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Zusatzangaben\"][\n",
" \"Kapitalgesellschaft\"\n",
" ][\"Zusatz_Aktiengesellschaft\"][\"Grundkapital\"][\"Hoehe\"]\n",
" elif company_type in [\n",
" \"Einzelkaufmann\",\n",
" \"Einzelkauffrau\",\n",
" \"eingetragene Genossenschaft\",\n",
" \"Partnerschaft\",\n",
" \"Einzelkaufmann / Einzelkauffrau\",\n",
" \"Offene Handelsgesellschaft\",\n",
" \"Partnerschaftsgesellschaft\",\n",
" None,\n",
" ]:\n",
" return None\n",
" else:\n",
" return None\n",
" return {\n",
" \"value\": float(capital[\"Zahl\"]),\n",
" \"currency\": capital[\"Waehrung\"],\n",
" \"type\": capital_type,\n",
" }\n",
"\n",
"\n",
"def map_geschaeftszweck(data: dict) -> str:\n",
" try:\n",
" return data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Basisdaten_Register\"][\n",
" \"Gegenstand_oder_Geschaeftszweck\"\n",
" ]\n",
" except:\n",
" return None\n",
"\n",
"\n",
"from datetime import datetime\n",
"\n",
"\n",
"def transform_date_to_iso(date: str) -> str:\n",
" regex_yy = r\"^\\d{1,2}\\.\\d{1,2}\\.\\d{2}$\"\n",
"\n",
" if re.match(regex_yy, date):\n",
" input_format = \"%d.%m.%y\"\n",
" else:\n",
" input_format = \"%d.%m.%Y\"\n",
" date_temp = datetime.strptime(date, input_format)\n",
" return date_temp.strftime(\"%Y-%m-%d\")\n",
"\n",
"\n",
"# TODO transform date to iso format (YYYY-MM-DD)\n",
"def map_founding_date(data: dict) -> str:\n",
" text = str(data)\n",
" entry_date = re.findall(\n",
" r\".Tag der ersten Eintragung:(\\\\n| )?(\\d{1,2}\\.\\d{1,2}\\.\\d{2,4})\", text\n",
" )\n",
" if len(entry_date) == 1:\n",
" return transform_date_to_iso(entry_date[0][1])\n",
"\n",
" entry_date = re.findall(\n",
" r\".Gesellschaftsvertrag vom (\\d{1,2}\\.\\d{1,2}\\.\\d{2,4})\", text\n",
" )\n",
" if len(entry_date) == 1:\n",
" return transform_date_to_iso(entry_date[0])\n",
"\n",
" if \"Eintragungstext\" in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Auszug\"]:\n",
" if (\n",
" type(\n",
" data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Auszug\"][\"Eintragungstext\"]\n",
" )\n",
" == \"list\"\n",
" ):\n",
" temp = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Auszug\"][\n",
" \"Eintragungstext\"\n",
" ][0][\"Text\"]\n",
" results = re.findall(r\"\\d{1,2}\\.\\d{1,2}\\.\\d{2,4}\", temp)\n",
" if len(temp) == 1:\n",
" return transform_date_to_iso(results[0])\n",
" if (\n",
" \"Gruendungsmetadaten\"\n",
" in data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Basisdaten_Register\"]\n",
" ):\n",
" temp = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Basisdaten_Register\"][\n",
" \"Gruendungsmetadaten\"\n",
" ][\"Gruendungsdatum\"]\n",
" return temp\n",
" # No reliable answer\n",
" # raise ValueError()\n",
" return None\n",
"\n",
"\n",
"def map_unternehmensregister_json(data: dict) -> dict:\n",
" result = {\"relationships\": []}\n",
"\n",
@ -4148,6 +4326,11 @@
" result[\"last_update\"] = data[\"XJustiz_Daten\"][\"Fachdaten_Register\"][\"Auszug\"][\n",
" \"letzte_Eintragung\"\n",
" ]\n",
" # TODO New features --> to be tested\n",
" result[\"company_type\"] = map_rechtsform(result[\"name\"], data)\n",
" result[\"capital\"] = map_stammkapital(data, result[\"company_type\"])\n",
" result[\"business_purpose\"] = map_geschaeftszweck(data)\n",
" result[\"founding_date\"] = map_founding_date(data)\n",
"\n",
" for i in range(\n",
" 2, len(data[\"XJustiz_Daten\"][\"Grunddaten\"][\"Verfahrensdaten\"][\"Beteiligung\"])\n",
@ -4161,90 +4344,118 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"import glob\n",
"import dataclasses\n",
"from tqdm import tqdm"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" 0%| | 0/3381 [00:00<?, ?it/s]"
" 2%|▏ | 55/3381 [00:00<00:06, 549.02it/s]"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 3381/3381 [00:06<00:00, 514.81it/s]\n"
"100%|██████████| 3381/3381 [00:04<00:00, 718.37it/s]\n"
]
}
],
"source": [
"import json\n",
"import dataclasses\n",
"from tqdm import tqdm\n",
"\n",
"for file in tqdm(glob.glob1(\"./data/Unternehmensregister/export\", \"*.json\")):\n",
" path = os.path.join(\"./data/Unternehmensregister/export\", file)\n",
" with open(path, \"r\", encoding=\"utf-8\") as file_object:\n",
" data = json.loads(file_object.read())\n",
" company: Company = map_unternehmensregister_json(data)\n",
" try:\n",
" data = json.loads(file_object.read())\n",
" company: Company = map_unternehmensregister_json(data)\n",
"\n",
" name = \"\".join(e for e in company.name if e.isalnum())[:50]\n",
" name = \"\".join(e for e in company.name if e.isalnum())[:50]\n",
"\n",
" with open(\n",
" f\"./data/Unternehmensregister/transformed/{name}.json\",\n",
" \"w+\",\n",
" encoding=\"utf-8\",\n",
" ) as export_file:\n",
" json.dump(dataclasses.asdict(company), export_file, ensure_ascii=False)"
" with open(\n",
" f\"./data/Unternehmensregister/transformed/{name}.json\",\n",
" \"w+\",\n",
" encoding=\"utf-8\",\n",
" ) as export_file:\n",
" json.dump(dataclasses.asdict(company), export_file, ensure_ascii=False)\n",
" except:\n",
" print(path)\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"from aki_prj23_transparenzregister.utils.mongo.connector import (\n",
" MongoConnector,\n",
" MongoConnection,\n",
")\n",
"from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (\n",
" CompanyMongoService,\n",
")\n",
"\n",
"module_path = os.path.abspath(os.path.join(\"..\"))\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"mongodb://root:pR0R0v2e2@trisnol.tech:27017\n"
]
}
],
"source": [
"from News.utils.mongodb.mongo import MongoConnector\n",
"from Unternehmensregister.utils.CompanyMongoService import CompanyMongoService\n",
"\n",
"connector = MongoConnector(\n",
" hostname=\"trisnol.tech\",\n",
"conn_string = MongoConnection(\n",
" hostname=\"localhost\",\n",
" database=\"transparenzregister\",\n",
" username=\"root\",\n",
" password=\"pR0R0v2e2\",\n",
" username=\"username\",\n",
" password=\"password\",\n",
" port=27017,\n",
")\n",
"\n",
"connector = MongoConnector(conn_string)\n",
"\n",
"service = CompanyMongoService(connector)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['Die Gesellschaft hat am 31.03.2022 mit der BayWa Aktiengesellschaft mit dem Sitz in München (Amtsgericht München HRB 4921) ']\n",
"['Zwischen der E.ON Kraftwerke GmbH mit dem Sitz in Hannover (Amtsgericht Hannover HRB 58691) ']\n"
]
}
],
"source": [
"import re\n",
"\n",
"texts = [\n",
" \"\"\"\n",
"Die Gesellschaft hat am 31.03.2022 mit der BayWa Aktiengesellschaft mit dem Sitz in M\\u00fcnchen (Amtsgericht M\\u00fcnchen HRB 4921) als herrschender Gesellschaft einen Gewinnabf\\u00fchrungsvertrag geschlossen. \n",
"Die Gesellschafterversammlung hat mit Beschluss vom 31.03.2022 zugestimmt.\"\n",
"\"\"\",\n",
" \"\"\"Zwischen der E.ON Kraftwerke GmbH mit dem Sitz in Hannover (Amtsgericht Hannover HRB 58691) als herrschender Gesellschaft und der Gesellschaft als beherrschter Gesellschaft ist am 26.10.2004 und 08.11.2004 ein Beherrschungs- und Gewinnabf\\u00fchrungsvertrag abgeschlossen worden. \n",
"Die Gesellschafterversammlung der herrschenden Gesellschaft hat dem Vertrag am 08.11.2004 und die Gesellschafterversammlung der beherrschten Gesellschaft hat dem Vertrag am 08.11.2004 zugestimmt.\"\"\",\n",
"]\n",
"\n",
"for text in texts:\n",
" print(re.findall(r\"(.*)als herrschender Gesellschaft\", text))"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
@ -4258,14 +4469,14 @@
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 3147/3147 [00:30<00:00, 102.30it/s]"
"100%|██████████| 3147/3147 [00:31<00:00, 99.43it/s] "
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Inserted documents: 0\n"
"Inserted documents: 3147\n"
]
},
{
@ -4279,8 +4490,9 @@
"source": [
"from tqdm import tqdm\n",
"import glob\n",
"import os\n",
"import json\n",
"from Unternehmensregister.models.Company import Company\n",
"from aki_prj23_transparenzregister.models.company import Company\n",
"\n",
"num_inserted = 0\n",
"for file in tqdm(glob.glob1(\"./data/Unternehmensregister/transformed\", \"*.json\")):\n",
@ -4313,7 +4525,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.7"
"version": "3.11.3"
},
"orig_nbformat": 4
},

29
poetry.lock generated
View File

@ -1,4 +1,16 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "aenum"
version = "3.1.15"
description = "Advanced Enumerations (compatible with Python's stdlib Enum), NamedTuples, and NamedConstants"
optional = false
python-versions = "*"
files = [
{file = "aenum-3.1.15-py2-none-any.whl", hash = "sha256:27b1710b9d084de6e2e695dab78fe9f269de924b51ae2850170ee7e1ca6288a5"},
{file = "aenum-3.1.15-py3-none-any.whl", hash = "sha256:e0dfaeea4c2bd362144b87377e2c61d91958c5ed0b4daf89cb6f45ae23af6288"},
{file = "aenum-3.1.15.tar.gz", hash = "sha256:8cbd76cd18c4f870ff39b24284d3ea028fbe8731a58df3aa581e434c575b9559"},
]
[[package]]
name = "alabaster"
@ -5258,7 +5270,7 @@ files = [
]
[package.dependencies]
greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"}
greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"}
mypy = {version = ">=0.910", optional = true, markers = "python_version >= \"3\" and extra == \"mypy\""}
sqlalchemy2-stubs = {version = "*", optional = true, markers = "extra == \"mypy\""}
@ -5779,10 +5791,21 @@ files = [
[package.dependencies]
h11 = ">=0.9.0,<1"
[[package]]
name = "xmltodict"
version = "0.13.0"
description = "Makes working with XML feel like you are working with JSON"
optional = false
python-versions = ">=3.4"
files = [
{file = "xmltodict-0.13.0-py2.py3-none-any.whl", hash = "sha256:aa89e8fd76320154a40d19a0df04a4695fb9dc5ba977cbb68ab3e4eb225e7852"},
{file = "xmltodict-0.13.0.tar.gz", hash = "sha256:341595a488e3e01a85a9d8911d8912fd922ede5fecc4dce437eb4b6c8d037e56"},
]
[extras]
ingest = ["selenium"]
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "cb71ea0797629bb28e89620e47e3b79dd04718e4e5bd75404b15e8e7ab2cf653"
content-hash = "2496706146d1d83ba9f22d7d4ddc9de7019803cc9c6ebeccb2372610ec1cf736"

View File

@ -36,6 +36,7 @@ version = "0.1.0"
[tool.poetry.dependencies]
SQLAlchemy = {version = "^1.4.49", extras = ["mypy"]}
aenum = "^3.1.15"
cachetools = "^5.3.1"
dash = "^2.13.0"
dash-bootstrap-components = "^1.5.0"
@ -50,6 +51,7 @@ python-dotenv = "^1.0.0"
seaborn = "^0.12.2"
selenium = "^4.12.0"
tqdm = "^4.66.1"
xmltodict = "^0.13.0"
[tool.poetry.extras]
ingest = ["selenium"]

View File

@ -10,7 +10,7 @@ class Auditor:
company: str | None
def to_dict(self) -> dict:
"""_summary_.
"""Transform to dict.
Returns:
dict: _description_

View File

@ -2,29 +2,92 @@
from dataclasses import asdict, dataclass
from enum import Enum
from aenum import MultiValueEnum
class RelationshipRoleEnum(Enum):
"""_summary_.
Args:
Enum (_type_): _description_
"""
class RelationshipRoleEnum(str, MultiValueEnum):
"""Roles taken by entities in relationships to a Company."""
STAKEHOLDER = ""
ORGANISATION = "ORGANISATION"
KOMMANDITIST = "Kommanditist(in)", "Kommanditist"
GESCHAEFTSFUEHRER = "Geschäftsführer(in)", "Geschäftsführer"
PROKURIST = "Prokurist(in)", "Prokurist"
VORSTAND = "Vorstand"
INHABER = "Inhaber(in)", "Inhaber"
HAFTENDER_GESELLSCHAFTER = (
"Persönlich haftende(r) Gesellschafter(in)",
"Persönlich haftender Gesellschafter",
)
LIQUIDATOR = "Liquidator(in)", "Liquidator"
PARTNER = "Partner(in)", "Partner"
DIREKTOR = "Geschäftsführende(r) Direktor(in)", "Geschäftsführender Direktor"
LEITUNG = "Mitglied des Leitungsorgans"
VORSTANDSVORSITZENDER = "Vorstandsvorsitzende(r)", "Vorstandsvorsitzender"
NACHFOLGER = "Rechtsnachfolger"
STAENDIGER_VERTRETER = "Ständige(r) Vertreter(in)"
SONSTIGER_VERTRETER = "Sonstige(r) Vertreter(in)", "Sonstiger Vertreter"
GESCHAEFTSLEITER = "Geschäftsleiter(in)", "Geschäftsleiter"
ZWEIGNIEDERLASSUNG = "Zweigniederlassung"
HAUPTNIEDERLASSUNG = "Hauptniederlassung"
class CompanyTypeEnum(str, MultiValueEnum):
"""Type of Company."""
GMBH = "Gesellschaft mit beschränkter Haftung"
SE = "Europäische Aktiengesellschaft (SE)"
KG = "Kommanditgesellschaft"
EINZELKAUFMANN = (
"Einzelkaufmann",
"Einzelkauffrau",
"Einzelkaufmann / Einzelkauffrau",
)
EG = "eingetragene Genossenschaft"
AG = "Aktiengesellschaft"
PARTNERSCHAFTSGESELLSCHAFT = "Partnerschaftsgesellschaft"
PARTNERGESELLSCHAFT = "Partnergesellschaft"
PARTNERSCHAFT = "Partnerschaft"
KGaA = "Kommanditgesellschaft auf Aktien"
OHG = "Offene Handelsgesellschaft"
AUSLAENDISCHE_RECHTSFORM = "Rechtsform ausländischen Rechts HRB"
JURISTISCHE_PERSON = "HRA Juristische Person"
@dataclass
class DistrictCourt:
"""DistrictCourt."""
name: str
city: str
def to_dict(self) -> dict:
"""Transform to dict.
Returns:
dict: Dictionary
"""
return asdict(self)
@dataclass
class CompanyID:
"""_summary_."""
"""CompanyID."""
district_court: str
district_court: DistrictCourt
hr_number: str
def to_dict(self) -> dict:
"""Transform to dict.
Returns:
dict: Dictionary
"""
return asdict(self)
@dataclass
class Location:
"""_summary_."""
"""Location."""
city: str
street: str | None = None
@ -32,12 +95,43 @@ class Location:
zip_code: str | None = None
class CompanyRelationshipEnum(str, Enum):
"""Type of companyrelations."""
PERSON = "Person"
COMPANY = "Company"
@dataclass
class CompanyRelationship:
"""_summary_."""
"""Relation of a Company to a person or another company."""
role: RelationshipRoleEnum
location: Location
type: CompanyRelationshipEnum # noqa: A003
@dataclass
class PersonName:
"""Combination of first and lastname as a class."""
firstname: str
lastname: str
@dataclass
class PersonToCompanyRelationship(CompanyRelationship):
"""Extension of CompanyRelationship with extras for Person."""
name: PersonName
date_of_birth: str
@dataclass
class CompanyToCompanyRelationship(CompanyRelationship):
"""Extension of CompanyRelationship with extras for Company."""
name: str
class FinancialKPIEnum(Enum):
@ -85,10 +179,33 @@ class YearlyResult:
kpis: dict[FinancialKPIEnum, float]
class CurrencyEnum(str, MultiValueEnum):
"""Enum of possible currencies."""
EURO = "EUR"
DEUTSCHE_MARK = "DM", "DEM"
KEINE_ANGABE = ""
class CapitalTypeEnum(str, Enum):
"""Enum of possible capital types."""
HAFTEINLAGE = "Hafteinlage"
STAMMKAPITAL = "Stammkapital"
GRUNDKAPITAL = "Grundkapital"
@dataclass
class Capital:
"""Capital of company."""
value: float
currency: CurrencyEnum
type: CapitalTypeEnum # noqa: A003
@dataclass
class Company:
"""_summary_."""
"""Company dataclass."""
id: CompanyID
@ -96,8 +213,12 @@ class Company:
name: str
last_update: str
relationships: list[CompanyRelationship]
# yearly_results: list[FinancialResults]
# yearly_results: list[FinancialResults]] | None
company_type: CompanyTypeEnum | None = None
capital: Capital | None = None
business_purpose: str | None = None
founding_date: str | None = None
def to_dict(self) -> dict:
"""_summary_."""
"""Transform class to dict."""
return asdict(self)

View File

@ -0,0 +1 @@
"""Everything regarding data extraction from the Unternehmensregister."""

View File

@ -1,20 +1,18 @@
"""Unternehmensregister Scraping."""
import glob
import logging
import multiprocessing
import os
from pathlib import Path
from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
logger = logging.getLogger()
def scrape(query: str, download_dir: list[str]):
def scrape(query: str, download_dir: list[str]) -> None:
"""Fetch results from Unternehmensregister for given query.
Args:
@ -152,7 +150,7 @@ def get_num_files(path: str, pattern: str = "*.xml") -> int:
return len(glob.glob1(path, pattern))
def rename_latest_file(path: str, filename: str, pattern: str = "*.xml"):
def rename_latest_file(path: str, filename: str, pattern: str = "*.xml") -> None:
"""Rename file in dir with latest change date.
Args:

View File

@ -0,0 +1,30 @@
"""Load processed Unternehmensregister data into MongoDB."""
import glob
import json
import os
from tqdm import tqdm
from aki_prj23_transparenzregister.config.config_providers import JsonFileConfigProvider
from aki_prj23_transparenzregister.models.company import Company
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService,
)
from aki_prj23_transparenzregister.utils.mongo.connector import (
MongoConnector,
)
if __name__ == "__main__":
provider = JsonFileConfigProvider("secrets.json")
conn_string = provider.get_mongo_connection_string()
connector = MongoConnector(conn_string)
service = CompanyMongoService(connector)
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/transformed", "*.json")):
path = os.path.join(f"{base_path}/transformed", file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.migrations_of_base_data(company)

View File

@ -0,0 +1,481 @@
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
import dataclasses
import glob
import json
import os
import re
import sys
import xmltodict
from tqdm import tqdm
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationship,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.string_tools import transform_date_to_iso
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
"""Convert all xml files in a directory to json files.
Args:
source_dir (str): Directory hosting the xml files
target_dir (str): Target directory to move json files to
"""
for source_path in [
os.path.normpath(i) for i in glob.glob(source_dir + "**/*.xml", recursive=True)
]:
target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
)
with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data))
def parse_stakeholder(data: dict) -> CompanyRelationship | None:
"""Extract the company stakeholder/relation from a single "Beteiligung".
Args:
data (dict): Data export
Returns:
CompanyRelationship | None: Relationship if it could be processed
"""
if "Natuerliche_Person" in data["Beteiligter"]:
# It's a Compnay serving as a "Kommanditist" or similar
if data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"]["Vorname"] is None:
return CompanyToCompanyRelationship(
**{
"name": data["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
],
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"][
"Anschrift"
][-1]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"],
list,
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return PersonToCompanyRelationship(
**{
"name": PersonName(
**{
"firstname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Vorname"],
"lastname": data["Beteiligter"]["Natuerliche_Person"][
"Voller_Name"
]["Nachname"],
}
),
"date_of_birth": data["Beteiligter"]["Natuerliche_Person"]["Geburt"][
"Geburtsdatum"
]
if "Geburt" in data["Beteiligter"]["Natuerliche_Person"]
else None,
"location": Location(
**{
"city": data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
-1
]["Ort"]
if isinstance(
data["Beteiligter"]["Natuerliche_Person"]["Anschrift"], list
)
else data["Beteiligter"]["Natuerliche_Person"]["Anschrift"][
"Ort"
]
}
),
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"type": CompanyRelationshipEnum.PERSON,
}
)
if "Organisation" in data["Beteiligter"]:
return CompanyToCompanyRelationship(
**{
"role": RelationshipRoleEnum(
data["Rolle"]["Rollenbezeichnung"]["content"]
),
"name": data["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
],
"location": Location(
**{
"city": data["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"street": data["Beteiligter"]["Organisation"]["Anschrift"][
"Strasse"
]
if "Strasse" in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"house_number": data["Beteiligter"]["Organisation"][
"Anschrift"
]["Hausnummer"]
if "Hausnummer"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
"zip_code": data["Beteiligter"]["Organisation"]["Anschrift"][
"Postleitzahl"
]
if "Postleitzahl"
in data["Beteiligter"]["Organisation"]["Anschrift"]
else None,
}
),
"type": CompanyRelationshipEnum.COMPANY,
}
)
return None
def loc_from_beteiligung(data: dict) -> Location:
"""Extract the company location from the first relationship in the export.
Args:
data (dict): Data export
Returns:
Location: location
"""
return Location(
**{
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Ort"],
"zip_code": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Postleitzahl"],
"street": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Strasse"]
if "Strasse"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
else None,
"house_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][0]["Beteiligter"]["Organisation"]["Anschrift"]["Hausnummer"]
if "Hausnummer"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Anschrift"]
else None,
}
)
def name_from_beteiligung(data: dict) -> str:
"""Extract the Company name from an Unternehmensregister export by using the first relationship found.
Args:
data (dict): Data export
Returns:
str: Company name
"""
return data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][0][
"Beteiligter"
]["Organisation"]["Bezeichnung"]["Bezeichnung_Aktuell"]
def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
"""Extracts the company type from a given Unternehmensregister export.
Args:
company_name (str): Name of the company as a fallback solution
data (dict): Data export
Returns:
CompanyTypeEnum | None: Company type if found
"""
try:
return CompanyTypeEnum(
data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Rechtstraeger"
]["Rechtsform"]["content"]
)
except KeyError:
if (
company_name.endswith("GmbH")
or company_name.endswith("UG")
or company_name.endswith("UG (haftungsbeschränkt)")
):
return CompanyTypeEnum("Gesellschaft mit beschränkter Haftung")
if company_name.endswith("SE"):
return CompanyTypeEnum("Europäische Aktiengesellschaft (SE)")
if company_name.endswith("KG"):
return CompanyTypeEnum("Kommanditgesellschaft")
return None
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
"""Extracts the company capital from the given Unternehmensregister export.
Args:
data (dict): Data export
company_type (CompanyTypeEnum): Type of company (e.g., 'Gesellschaft mit beschränkter Haftung')
Returns:
Capital | None: Company Capital if found
"""
# Early return
if "Zusatzangaben" not in data["XJustiz_Daten"]["Fachdaten_Register"]:
return None
capital: dict = {"Zahl": 0.0, "Waehrung": ""}
if company_type == CompanyTypeEnum.KG:
capital_type = "Hafteinlage"
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]["Zusatz_KG"]["Daten_Kommanditist"]
if isinstance(base, list):
for entry in base:
# TODO link to persons using Ref_Rollennummer then extract ["Hafteinlage"] as below
capital["Zahl"] = capital["Zahl"] + float(entry["Hafteinlage"]["Zahl"])
capital["Waehrung"] = entry["Hafteinlage"]["Waehrung"]
elif isinstance(base, dict):
capital = base["Hafteinlage"]
elif company_type in [
CompanyTypeEnum.GMBH,
CompanyTypeEnum.SE,
CompanyTypeEnum.AG,
CompanyTypeEnum.KGaA,
CompanyTypeEnum.AUSLAENDISCHE_RECHTSFORM,
CompanyTypeEnum.OHG,
]:
if (
"Kapitalgesellschaft"
not in data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"]
):
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Personengesellschaft"
]
else:
base = data["XJustiz_Daten"]["Fachdaten_Register"]["Zusatzangaben"][
"Kapitalgesellschaft"
]
if "Zusatz_GmbH" in base:
capital_type = "Stammkapital"
capital = base["Zusatz_GmbH"]["Stammkapital"]
elif "Zusatz_Aktiengesellschaft" in base:
capital_type = "Grundkapital"
capital = base["Zusatz_Aktiengesellschaft"]["Grundkapital"]["Hoehe"]
elif company_type in [
CompanyTypeEnum.EINZELKAUFMANN,
CompanyTypeEnum.EG,
CompanyTypeEnum.PARTNERSCHAFT,
CompanyTypeEnum.PARTNERGESELLSCHAFT,
CompanyTypeEnum.PARTNERSCHAFTSGESELLSCHAFT,
None,
]:
return None
# Catch entries having the dict but with null values
if not all(capital.values()):
return None
return Capital(
**{ # type: ignore
"value": float(capital["Zahl"]),
"currency": CurrencyEnum(capital["Waehrung"]),
"type": CapitalTypeEnum(capital_type),
}
)
def map_business_purpose(data: dict) -> str | None:
"""Extracts the "Geschäftszweck" from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Business purpose if found
"""
try:
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gegenstand_oder_Geschaeftszweck"
]
except KeyError:
return None
def map_founding_date(data: dict) -> str | None:
"""Extracts the founding date from a given Unternehmensregister export.
Args:
data (dict): Data export
Returns:
str | None: Founding date if found
"""
text = str(data)
entry_date = re.findall(
r".Tag der ersten Eintragung:(\\n| )?(\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0][1])
entry_date = re.findall(
r".Gesellschaftsvertrag vom (\d{1,2}\.\d{1,2}\.\d{2,4})", text
)
if len(entry_date) == 1:
return transform_date_to_iso(entry_date[0])
if (
"Gruendungsmetadaten"
in data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"]
):
return data["XJustiz_Daten"]["Fachdaten_Register"]["Basisdaten_Register"][
"Gruendungsmetadaten"
]["Gruendungsdatum"]
# No reliable answer
return None
def map_company_id(data: dict) -> CompanyID:
"""Retrieve Company ID from export.
Args:
data (dict): Data export
Returns:
CompanyID: ID of the company
"""
return CompanyID(
**{
"hr_number": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Instanzdaten"
]["Aktenzeichen"],
"district_court": DistrictCourt(
**{
"name": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Bezeichnung"][
"Bezeichnung_Aktuell"
]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Voller_Name"][
"Nachname"
],
"city": data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Organisation"]["Sitz"]["Ort"]
if "Organisation"
in data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]
else data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"][
"Beteiligung"
][1]["Beteiligter"]["Natuerliche_Person"]["Anschrift"]["Ort"],
}
),
}
)
def map_last_update(data: dict) -> str:
"""Extract last update date from export.
Args:
data (dict): Unternehmensregister export
Returns:
str: Last update date
"""
return data["XJustiz_Daten"]["Fachdaten_Register"]["Auszug"]["letzte_Eintragung"]
def map_unternehmensregister_json(data: dict) -> Company:
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
Args:
data (dict): Data export
Returns:
Company: Transformed data
"""
result: dict = {"relationships": []}
# TODO Refactor mapping - this is a nightmare...
result["id"] = map_company_id(data)
result["name"] = name_from_beteiligung(data)
result["location"] = loc_from_beteiligung(data)
result["last_update"] = map_last_update(data)
result["company_type"] = map_rechtsform(result["name"], data)
result["capital"] = map_capital(data, result["company_type"])
result["business_purpose"] = map_business_purpose(data)
result["founding_date"] = map_founding_date(data)
for i in range(
2, len(data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"])
):
people = parse_stakeholder(
data["XJustiz_Daten"]["Grunddaten"]["Verfahrensdaten"]["Beteiligung"][i]
)
result["relationships"].append(people)
return Company(**result)
if __name__ == "__main__":
from loguru import logger
# transform_xml_to_json(
# "./data/Unternehmensregister/scraping/", "./data/Unternehmensregister/export/"
# )
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
data = json.loads(file_object.read())
company: Company = map_unternehmensregister_json(data)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception:
logger.error(f"Error in processing {path}")
sys.exit(1)

View File

@ -9,10 +9,10 @@ from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
class CompanyMongoService:
"""_summary_."""
"""Wrapper for MongoDB regarding management of Company documents."""
def __init__(self, connector: MongoConnector):
"""_summary_.
"""Constructor.
Args:
connector (MongoConnector): _description_
@ -21,26 +21,40 @@ class CompanyMongoService:
self.lock = Lock() # Create a lock for synchronization
def get_all(self) -> list[Company]:
"""_summary_.
"""Get all Company documents.
Returns:
list[Company]: _description_
list[Company]: List of retrieved companies
"""
with self.lock:
result = self.collection.find()
return list(result)
def get_by_id(self, id: str) -> Company | None:
"""_summary_.
def get_by_id(self, id: dict) -> dict | None:
"""Get a Company document by the given id.
Args:
id (str): _description_
id (CompanyID): CompanyID
Returns:
Company | None: _description_
dict | None: Company if found
"""
with self.lock:
result = list(self.collection.find({"id": id}))
result = list(
self.collection.find(
{
"id": {
"$eq": {
"hr_number": id["hr_number"],
"district_court": {
"name": id["district_court"]["name"],
"city": id["district_court"]["city"],
},
}
}
}
)
)
if len(result) == 1:
return result[0]
return None
@ -81,7 +95,7 @@ class CompanyMongoService:
return list(self.collection.find({"yearly_results": {"$gt": {}}}))
def insert(self, company: Company) -> InsertOneResult:
"""_summary_.
"""Insert a new Company document.
Args:
company (Company): _description_
@ -106,3 +120,21 @@ class CompanyMongoService:
return self.collection.update_one(
{"_id": ObjectId(_id)}, {"$set": {"yearly_results": yearly_results}}
)
def migrations_of_base_data(self, data: Company) -> InsertOneResult | UpdateResult:
"""Updates or inserts a document of type company depending on whether an entry with the same id (CompanyID) can be found.
Args:
data (Company): Company related data to persist
Returns:
InsertOneResult | UpdateResult: Result depending on action
"""
entry = self.get_by_id(data.id.to_dict())
if entry is None:
return self.insert(data)
statement = {"$set": dict(data.to_dict().items())}
with self.lock:
return self.collection.update_one(
{"_id": ObjectId(entry["_id"])}, statement
)

View File

@ -6,7 +6,7 @@ import pymongo
@dataclass
class MongoConnection:
"""_summary_."""
"""Wrapper for MongoDB connection string."""
hostname: str
database: str
@ -36,7 +36,7 @@ class MongoConnector:
"""Wrapper for establishing a connection to a MongoDB instance."""
def __init__(self, connection: MongoConnection):
"""_summary_.
"""Wrapper for MongoDB collection.
Args:
connection (MongoConnection): Wrapper for connection string

View File

@ -6,14 +6,10 @@ from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
class MongoNewsService:
"""_summary_.
Args:
NewsServiceInterface (_type_): _description_
"""
"""Wrapper for MongoDB regarding News documents."""
def __init__(self, connector: MongoConnector):
"""_summary_.
"""Constructor.
Args:
connector (MongoConnector): _description_
@ -21,7 +17,7 @@ class MongoNewsService:
self.collection = connector.database["news"]
def get_all(self) -> list[News]:
"""_summary_.
"""Get all News documents.
Returns:
list[News]: _description_
@ -30,7 +26,7 @@ class MongoNewsService:
return [MongoEntryTransformer.transform_outgoing(elem) for elem in result]
def get_by_id(self, id: str) -> News | None:
"""_summary_.
"""Get a News document by the given id.
Args:
id (str): _description_
@ -44,7 +40,7 @@ class MongoNewsService:
return None
def insert(self, news: News) -> InsertOneResult:
"""_summary_.
"""Insert a new News document.
Args:
news (News): _description_
@ -56,11 +52,7 @@ class MongoNewsService:
class MongoEntryTransformer:
"""_summary_.
Returns:
_type_: _description_
"""
"""Transform a dict to News entity and back."""
@staticmethod
def transform_ingoing(news: News) -> dict:

View File

@ -1,4 +1,6 @@
"""Contains functions fot string manipulation."""
import re
from datetime import datetime
def simplify_string(string_to_simplify: str | None) -> str | None:
@ -16,3 +18,19 @@ def simplify_string(string_to_simplify: str | None) -> str | None:
else:
raise TypeError("The string to simplify is not a string.")
return string_to_simplify if string_to_simplify else None
def transform_date_to_iso(date: str) -> str:
"""Transform a date in `DD.MM.YY(YY)` to `YYYY-MM-DD`.
Args:
date (str): Input date
Returns:
str: ISO date
"""
regex_yy = r"^\d{1,2}\.\d{1,2}\.\d{2}$"
input_format = "%d.%m.%y" if re.match(regex_yy, date) else "%d.%m.%Y"
date_temp = datetime.strptime(date, input_format)
return date_temp.strftime("%Y-%m-%d")

View File

@ -1,26 +1,43 @@
"""Test Models.company."""
from aki_prj23_transparenzregister.models.company import Company, CompanyID, Location
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
)
def test_to_dict() -> None:
"""Tests if the version tag is entered."""
company_id = CompanyID("The Shire", "420")
district_court = DistrictCourt("abc", "abc")
company_id = CompanyID(district_court=district_court, hr_number="HRB 123")
location = Location(
city="Insmouth", house_number="19", street="Harbor", zip_code="1890"
)
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, type=CapitalTypeEnum.GRUNDKAPITAL, value=42 # type: ignore
)
company = Company(
id=company_id,
last_update="Tomorrow",
location=location,
name="BLANK GmbH",
relationships=[],
business_purpose="Blockchain and NFTs",
capital=capital,
company_type=CompanyTypeEnum.AG, # type: ignore
founding_date="Yesterday",
)
assert company.to_dict() == {
"id": {
"district_court": company_id.district_court,
"district_court": district_court.to_dict(),
"hr_number": company_id.hr_number,
},
"last_update": company.last_update,
@ -32,4 +49,12 @@ def test_to_dict() -> None:
},
"name": "BLANK GmbH",
"relationships": [],
"business_purpose": "Blockchain and NFTs",
"capital": {
"value": capital.value,
"currency": capital.currency,
"type": capital.type,
},
"company_type": company.company_type,
"founding_date": "Yesterday",
}

View File

@ -0,0 +1,89 @@
"""Testing utisl/data_extraction/unternehmensregister/extract.py."""
import os
from tempfile import TemporaryDirectory
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
extract,
)
def prepare_temporary_dir(directory: str, formats: list[str]) -> None:
for index in range(len(formats)):
test_file = os.path.join(directory, f"file-{index}.{formats[index]}")
with open(test_file, "w") as file:
file.write(f"Hello There {index}")
def test_rename_latest_file() -> None:
import time
with TemporaryDirectory(dir="./") as temp_dir:
# Create some test files in the temporary directory
test_file1 = os.path.join(temp_dir, "file1.xml")
test_file2 = os.path.join(temp_dir, "file2.xml")
test_file3 = os.path.join(temp_dir, "file3.xml")
# Create files with different modification times
with open(test_file1, "w") as f:
f.write("Content 1")
time.sleep(0.15)
with open(test_file2, "w") as f:
f.write("Content 2")
time.sleep(0.15)
with open(test_file3, "w") as f:
f.write("Content 3")
time.sleep(0.15)
# Rename the latest file to 'new_file.xml'
extract.rename_latest_file(temp_dir, "new_file.xml")
# Verify that 'file3.xml' is renamed to 'new_file.xml'
assert not os.path.exists(test_file3)
assert os.path.exists(os.path.join(temp_dir, "new_file.xml"))
# Verify that 'file1.xml' and 'file2.xml' are still present
assert os.path.exists(test_file1)
assert os.path.exists(test_file2)
# Verify that renaming with a different pattern works
with open(test_file1, "w") as f:
f.write("Content 4")
with open(os.path.join(temp_dir, "file4.txt"), "w") as f:
f.write("Content 5")
# Rename the latest .txt file to 'new_file.txt'
extract.rename_latest_file(temp_dir, "new_file.txt", pattern="*.txt")
# Verify that 'file4.txt' is renamed to 'new_file.txt'
assert not os.path.exists(os.path.join(temp_dir, "file4.txt"))
assert os.path.exists(os.path.join(temp_dir, "new_file.txt"))
# Verify that 'file1.xml' is still present and unchanged
with open(test_file1) as f:
assert f.read() == "Content 4"
def test_get_num_files_default_pattern() -> None:
with TemporaryDirectory(dir="./") as temp_dir:
prepare_temporary_dir(temp_dir, ["xml", "xml", "xml"])
expected_result = 3
assert extract.get_num_files(temp_dir) == expected_result
def test_get_num_files_different_pattern() -> None:
with TemporaryDirectory(dir="./") as temp_dir:
prepare_temporary_dir(temp_dir, ["xml", "txt", "json"])
num_files = extract.get_num_files(temp_dir, "*.txt")
assert num_files == 1
def test_wait_for_download_condition() -> None:
with TemporaryDirectory(dir="./") as temp_dir:
prepare_temporary_dir(temp_dir, ["xml", "txt"])
assert extract.wait_for_download_condition(temp_dir, 2) is False
def test_scrape() -> None:
with TemporaryDirectory(dir="./") as temp_dir:
extract.scrape("GEA Farm Technologies GmbH", [temp_dir])

View File

@ -0,0 +1,8 @@
"""Test load utils from Unternehmensregister."""
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
load,
)
def test_smoke() -> None:
assert load

View File

@ -0,0 +1,592 @@
"""Testing utils/data_extraction/unternehmensregister/transform.py."""
import json
import os
from tempfile import TemporaryDirectory
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.models.company import (
Capital,
CapitalTypeEnum,
Company,
CompanyID,
CompanyRelationshipEnum,
CompanyToCompanyRelationship,
CompanyTypeEnum,
CurrencyEnum,
DistrictCourt,
Location,
PersonName,
PersonToCompanyRelationship,
RelationshipRoleEnum,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
transform,
)
def test_transform_xml_to_json() -> None:
with TemporaryDirectory(dir="./") as temp_source_dir:
with open(os.path.join(temp_source_dir, "test.xml"), "w") as file:
xml_input = """<?xml version="1.0" encoding="UTF-8"?>
<test>
<message>Hello World!</message>
</test>
"""
file.write(xml_input)
with TemporaryDirectory(dir="./") as temp_target_dir:
transform.transform_xml_to_json(temp_source_dir, temp_target_dir)
with open(os.path.join(temp_target_dir, "test.json")) as file:
json_output = json.load(file)
assert json_output == {"test": {"message": "Hello World!"}}
def test_parse_stakeholder_org_hidden_in_person() -> None:
data = {
"Beteiligter": {
"Natuerliche_Person": {
"Voller_Name": {"Vorname": None, "Nachname": "Some Company KG"},
"Anschrift": {"Ort": "Area 51"},
}
},
"Rolle": {"Rollenbezeichnung": {"content": "Kommanditist(in)"}},
}
expected_result = CompanyToCompanyRelationship(
role=RelationshipRoleEnum.KOMMANDITIST, # type: ignore
name="Some Company KG",
type=CompanyRelationshipEnum.COMPANY,
location=Location(**{"city": "Area 51"}),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_person() -> None:
data = {
"Beteiligter": {
"Natuerliche_Person": {
"Voller_Name": {"Vorname": "Stephen", "Nachname": "King"},
"Anschrift": {"Ort": "Maine"},
"Geburt": {"Geburtsdatum": "1947-09-21"},
}
},
"Rolle": {"Rollenbezeichnung": {"content": "Geschäftsleiter(in)"}},
}
expected_result = PersonToCompanyRelationship(
role=RelationshipRoleEnum.GESCHAEFTSLEITER, # type: ignore
date_of_birth="1947-09-21",
name=PersonName(**{"firstname": "Stephen", "lastname": "King"}),
type=CompanyRelationshipEnum.PERSON,
location=Location(**{"city": "Maine"}),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_org() -> None:
data = {
"Beteiligter": {
"Organisation": {
"Bezeichnung": {"Bezeichnung_Aktuell": "Transparenzregister kG"},
"Anschrift": {
"Ort": "Iserlohn",
"Strasse": "Hauptstrasse",
"Hausnummer": "42",
"Postleitzahl": "58636",
},
"Geburt": {"Geburtsdatum": "1947-09-21"},
}
},
"Rolle": {"Rollenbezeichnung": {"content": "Geschäftsführender Direktor"}},
}
expected_result = CompanyToCompanyRelationship(
name="Transparenzregister kG",
role=RelationshipRoleEnum.DIREKTOR, # type: ignore
type=CompanyRelationshipEnum.COMPANY,
location=Location(
**{
"city": "Iserlohn",
"zip_code": "58636",
"house_number": "42",
"street": "Hauptstrasse",
}
),
)
assert transform.parse_stakeholder(data) == expected_result
def test_parse_stakeholder_no_result() -> None:
data: dict = {"Beteiligter": {}}
assert transform.parse_stakeholder(data) is None
def test_loc_from_beteiligung() -> None:
data = {
"XJustiz_Daten": {
"Grunddaten": {
"Verfahrensdaten": {
"Beteiligung": [
{
"Beteiligter": {
"Beteiligtennummer": "1",
"Organisation": {
"Bezeichnung": {
"Bezeichnung_Aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
},
"Sitz": {
"Ort": "Heroldstatt",
"Staat": {
"@xsi:type": "WL_Staaten",
"@wl_version": "1.5",
"@wl_fassung": "2",
"content": "DE",
},
},
"Anschrift": {
"Strasse": "Gewerbestraße",
"Hausnummer": "8",
"Postleitzahl": "72535",
"Ort": "Heroldstatt",
},
},
}
},
]
}
}
}
}
expected_result = Location(
city="Heroldstatt", house_number="8", street="Gewerbestraße", zip_code="72535"
)
assert transform.loc_from_beteiligung(data) == expected_result
def test_name_from_beteiligung() -> None:
data = {
"XJustiz_Daten": {
"Grunddaten": {
"Verfahrensdaten": {
"Beteiligung": [
{
"Beteiligter": {
"Beteiligtennummer": "1",
"Organisation": {
"Bezeichnung": {
"Bezeichnung_Aktuell": "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
},
},
}
},
]
}
}
}
}
expected_result = "1 A Autenrieth Kunststofftechnik GmbH & Co. KG"
assert transform.name_from_beteiligung(data) == expected_result
def test_map_rechtsform() -> None:
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Basisdaten_Register": {
"Aktuelles_Satzungsdatum": "1952-07-15",
"Rechtstraeger": {
"Rechtsform": {
"content": "Gesellschaft mit beschränkter Haftung"
},
},
}
}
}
}
expected_result = "Gesellschaft mit beschränkter Haftung"
assert transform.map_rechtsform("", data) == expected_result
def test_map_rechtsform_from_name() -> None:
data = [
("GEA Farm Technologies GmbH", "Gesellschaft mit beschränkter Haftung"),
("Atos SE", "Europäische Aktiengesellschaft (SE)"),
("Bilkenroth KG", "Kommanditgesellschaft"),
("jfoiahfo8sah 98548902 öhz ö", None),
]
for company_name, expected_result in data:
assert transform.map_rechtsform(company_name, {}) == expected_result
def test_map_capital_kg_single() -> None:
capital = Capital(
currency=CurrencyEnum.EURO, value=69000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Personengesellschaft": {
"Zusatz_KG": {
"Daten_Kommanditist": {
"Hafteinlage": {
"Zahl": str(capital.value),
"Waehrung": capital.currency,
},
}
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result == capital
def test_map_capital_kg_sum() -> None:
capital = Capital(
currency=CurrencyEnum.EURO, value=20000, type=CapitalTypeEnum.HAFTEINLAGE # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Personengesellschaft": {
"Zusatz_KG": {
"Daten_Kommanditist": [
{
"Hafteinlage": {
"Zahl": str(10000),
"Waehrung": capital.currency,
}
},
{
"Hafteinlage": {
"Zahl": str(10000),
"Waehrung": capital.currency,
},
},
]
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result == capital
def test_map_capital_no_fachdaten() -> None:
data: dict = {"XJustiz_Daten": {"Fachdaten_Register": {}}}
result = transform.map_capital(data, CompanyTypeEnum.KG) # type: ignore
assert result is None
def test_map_capital_gmbh() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Kapitalgesellschaft": {
"Zusatz_GmbH": {
"Stammkapital": {
"Zahl": str(capital.value),
"Waehrung": capital.currency,
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.GMBH) # type: ignore
assert result == capital
def test_map_capital_ag() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.GRUNDKAPITAL # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Kapitalgesellschaft": {
"Zusatz_Aktiengesellschaft": {
"Grundkapital": {
"Hoehe": {
"Zahl": str(capital.value),
"Waehrung": capital.currency,
}
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.SE) # type: ignore
assert result == capital
def test_map_capital_personengesellschaft() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Personengesellschaft": {
"Zusatz_GmbH": {
"Stammkapital": {
"Zahl": str(capital.value),
"Waehrung": capital.currency,
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
assert result == capital
def test_map_capital_einzelkaufmann() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Personengesellschaft": {
"Zusatz_GmbH": {
"Stammkapital": {
"Zahl": str(capital.value),
"Waehrung": capital.currency,
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.EINZELKAUFMANN) # type: ignore
assert result is None
def test_map_capital_partial_null_values() -> None:
capital = Capital(
currency=CurrencyEnum.DEUTSCHE_MARK, value=42, type=CapitalTypeEnum.STAMMKAPITAL # type: ignore
)
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Zusatzangaben": {
"Personengesellschaft": {
"Zusatz_GmbH": {
"Stammkapital": {
"Zahl": None,
"Waehrung": capital.currency,
},
}
}
}
}
}
}
result = transform.map_capital(data, CompanyTypeEnum.OHG) # type: ignore
assert result is None
def test_map_business_purpose() -> None:
business_purpose = "Handel mit Betäubungsmitteln aller Art"
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Basisdaten_Register": {
"Gegenstand_oder_Geschaeftszweck": business_purpose
}
}
}
}
result = transform.map_business_purpose(data)
assert result == business_purpose
def test_map_business_purpose_no_result() -> None:
data: dict = {"XJustiz_Daten": {}}
result = transform.map_business_purpose(data)
assert result is None
def test_map_founding_date_from_tag_der_ersten_eintragung() -> None:
data = {
"some entry": "Tag der ersten Eintragung: 01.05.2004",
"some other entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
}
expected_result = "2004-05-01"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_from_gesellschaftsvertrag() -> None:
data = {
"some entry": "hfjdoöiashföahöf iodsazo8 5z4o fdsha8oü gfdsö",
"some other entry": "Das Wesen der Rekursion ist der Selbstaufruf Gesellschaftsvertrag vom 22.12.1996 Hallo Welt",
}
expected_result = "1996-12-22"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_from_gruendungsdatum() -> None:
data = {
"XJustiz_Daten": {
"Fachdaten_Register": {
"Basisdaten_Register": {
"Gruendungsmetadaten": {"Gruendungsdatum": "1998-01-01"}
}
}
}
}
expected_result = "1998-01-01"
result = transform.map_founding_date(data)
assert result == expected_result
def test_map_founding_date_no_result() -> None:
data: dict = {"XJustiz_Daten": {"Fachdaten_Register": {"Basisdaten_Register": {}}}}
result = transform.map_founding_date(data)
assert result is None
def test_map_company_id() -> None:
district_court = DistrictCourt("Amtsgericht Ulm", "Ulm")
company_id = CompanyID(district_court, "HRA 4711")
data = {
"XJustiz_Daten": {
"Grunddaten": {
"@XJustizVersion": "1.20.0",
"Verfahrensdaten": {
"Instanzdaten": {
"Aktenzeichen": company_id.hr_number,
},
"Beteiligung": [
{},
{
"Beteiligter": {
"Organisation": {
"Bezeichnung": {
"Bezeichnung_Aktuell": district_court.name
},
"Sitz": {
"Ort": district_court.city,
},
}
},
},
],
},
},
}
}
result = transform.map_company_id(data)
assert result == company_id
def test_map_last_update() -> None:
date = "2024-01-01"
data = {
"XJustiz_Daten": {"Fachdaten_Register": {"Auszug": {"letzte_Eintragung": date}}}
}
result = transform.map_last_update(data)
assert result == date
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_company_id"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.name_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.loc_from_beteiligung"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_last_update"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_rechtsform"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_capital"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_business_purpose"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.map_founding_date"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.parse_stakeholder"
)
def test_map_unternehmensregister_json( # noqa: PLR0913
mock_map_parse_stakeholder: Mock,
mock_map_founding_date: Mock,
mock_map_business_purpose: Mock,
mock_map_capital: Mock,
mock_map_rechtsform: Mock,
mock_map_last_update: Mock,
mock_loc_from_beteiligung: Mock,
mock_map_name_from_beteiligung: Mock,
mock_map_company_id: Mock,
) -> None:
expected_result = Company(
**{ # type: ignore
"id": Mock(),
"name": Mock(),
"location": Mock(),
"last_update": Mock(),
"company_type": Mock(),
"capital": Mock(),
"business_purpose": Mock(),
"founding_date": Mock(),
"relationships": [Mock()],
}
)
mock_map_company_id.return_value = expected_result.id
mock_map_name_from_beteiligung.return_value = expected_result.name
mock_loc_from_beteiligung.return_value = expected_result.location
mock_map_last_update.return_value = expected_result.last_update
mock_map_rechtsform.return_value = expected_result.company_type
mock_map_capital.return_value = expected_result.capital
mock_map_business_purpose.return_value = expected_result.business_purpose
mock_map_founding_date.return_value = expected_result.founding_date
mock_map_parse_stakeholder.return_value = expected_result.relationships[0]
data: dict = {
"XJustiz_Daten": {
"Grunddaten": {"Verfahrensdaten": {"Beteiligung": [{}, {}, {}]}}
}
}
result = transform.map_unternehmensregister_json(data)
assert result == expected_result

View File

@ -3,7 +3,12 @@ from unittest.mock import Mock
import pytest
from aki_prj23_transparenzregister.models.company import Company, CompanyID, Location
from aki_prj23_transparenzregister.models.company import (
Company,
CompanyID,
DistrictCourt,
Location,
)
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService,
)
@ -73,7 +78,8 @@ def test_by_id_no_result(mock_mongo_connector: Mock, mock_collection: Mock) -> N
mock_mongo_connector.database = {"companies": mock_collection}
service = CompanyMongoService(mock_mongo_connector)
mock_collection.find.return_value = []
assert service.get_by_id("Does not exist") is None
id = CompanyID(DistrictCourt("a", "b"), "c").to_dict()
assert service.get_by_id(id) is None
def test_by_id_result(mock_mongo_connector: Mock, mock_collection: Mock) -> None:
@ -81,13 +87,14 @@ def test_by_id_result(mock_mongo_connector: Mock, mock_collection: Mock) -> None
Args:
mock_mongo_connector (Mock): Mocked MongoConnector library
mock_collection (Mock): Mocked pymongo collection
mock_collection (Mock): Mocked pymongo collection.
"""
mock_mongo_connector.database = {"companies": mock_collection}
service = CompanyMongoService(mock_mongo_connector)
mock_entry = {"id": "Does exist", "vaue": 42}
mock_collection.find.return_value = [mock_entry]
assert service.get_by_id("Does exist") == mock_entry
id = CompanyID(DistrictCourt("a", "b"), "c").to_dict()
assert service.get_by_id(id) == mock_entry
def test_insert(mock_mongo_connector: Mock, mock_collection: Mock) -> None:
@ -103,7 +110,7 @@ def test_insert(mock_mongo_connector: Mock, mock_collection: Mock) -> None:
mock_collection.insert_one.return_value = mock_result
assert (
service.insert(
Company(CompanyID("", ""), Location("Hier und Dort"), "", "", [])
Company(CompanyID("", ""), Location("Hier und Dort"), "", "", []) # type: ignore
)
== mock_result
)

View File

@ -33,3 +33,15 @@ def test_simplify_string_type_error(value: Any) -> None:
"""Tests if the type error is thrown when the value is the wrong type."""
with pytest.raises(TypeError):
assert string_tools.simplify_string(value)
@pytest.mark.parametrize(
("value", "expected"),
[
("10.10.1111", "1111-10-10"),
("10.10.98", "1998-10-10"),
],
)
def test_transform_date_to_iso(value: str, expected: str) -> None:
result = string_tools.transform_date_to_iso(value)
assert result == expected