mirror of
https://github.com/fhswf/aki_prj23_transparenzregister.git
synced 2025-06-22 04:43:54 +02:00
refactor: Apply linter feedback
This commit is contained in:
@ -14,7 +14,17 @@ from aki_prj23_transparenzregister.utils.mongo.connector import (
|
|||||||
MongoConnector,
|
MongoConnector,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int:
|
def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int:
|
||||||
|
"""Load all json files in a directory to MongoDB company collection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_path (str): Directory to scan
|
||||||
|
service (CompanyMongoService): MongoDB service
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Number of processed files
|
||||||
|
"""
|
||||||
num_processed = 0
|
num_processed = 0
|
||||||
for file in tqdm(glob.glob1(base_path, "*.json")):
|
for file in tqdm(glob.glob1(base_path, "*.json")):
|
||||||
path = os.path.join(base_path, file)
|
path = os.path.join(base_path, file)
|
||||||
@ -26,10 +36,14 @@ def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int
|
|||||||
num_processed += 1
|
num_processed += 1
|
||||||
return num_processed
|
return num_processed
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
provider = JsonFileConfigProvider("secrets.json")
|
provider = JsonFileConfigProvider("secrets.json")
|
||||||
conn_string = provider.get_mongo_connection_string()
|
conn_string = provider.get_mongo_connection_string()
|
||||||
connector = MongoConnector(conn_string)
|
connector = MongoConnector(conn_string)
|
||||||
service = CompanyMongoService(connector)
|
service = CompanyMongoService(connector)
|
||||||
|
|
||||||
load_directory_to_mongo("./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed", service)
|
load_directory_to_mongo(
|
||||||
|
"./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed",
|
||||||
|
service,
|
||||||
|
)
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
"""Transform Unternehmensregister data to Transparenzregister API."""
|
||||||
|
@ -3,16 +3,21 @@ import dataclasses
|
|||||||
import glob
|
import glob
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import sys
|
import sys
|
||||||
|
import typing
|
||||||
|
|
||||||
import xmltodict
|
import xmltodict
|
||||||
from tqdm import tqdm
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import v1
|
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import v3
|
|
||||||
from aki_prj23_transparenzregister.models.company import Company
|
from aki_prj23_transparenzregister.models.company import Company
|
||||||
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import (
|
||||||
|
v1,
|
||||||
|
)
|
||||||
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import (
|
||||||
|
v3,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
|
def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
|
||||||
"""Convert all xml files in a directory to json files.
|
"""Convert all xml files in a directory to json files.
|
||||||
@ -36,13 +41,27 @@ def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(e)
|
logger.error(e)
|
||||||
|
|
||||||
def determine_version(data: dict):
|
|
||||||
|
def determine_version(data: dict) -> typing.Any:
|
||||||
|
"""Determine Unternehmensregister data API version of given entry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Unternehmensregister data
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If version could not be determined
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
module: Version module
|
||||||
|
"""
|
||||||
if "XJustiz_Daten" in data:
|
if "XJustiz_Daten" in data:
|
||||||
|
# TODO consider class inheritance for version modules
|
||||||
return v1
|
return v1
|
||||||
elif "tns:nachrichtenkopf" in data[list(data.keys())[0]]:
|
if "tns:nachrichtenkopf" in data[list(data.keys())[0]]:
|
||||||
return v3
|
return v3
|
||||||
raise ValueError("Could not determine Unternehmensregister version.")
|
raise ValueError("Could not determine Unternehmensregister version.")
|
||||||
|
|
||||||
|
|
||||||
def map_unternehmensregister_json(data: dict) -> Company:
|
def map_unternehmensregister_json(data: dict) -> Company:
|
||||||
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
"""Processes the Unternehmensregister structured export to a Company by using several helper methods.
|
||||||
|
|
||||||
@ -57,8 +76,6 @@ def map_unternehmensregister_json(data: dict) -> Company:
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
|
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
|
||||||
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
|
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
|
||||||
path = os.path.join(f"{base_path}/export", file)
|
path = os.path.join(f"{base_path}/export", file)
|
||||||
|
@ -1,13 +1,5 @@
|
|||||||
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
|
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
|
||||||
import dataclasses
|
|
||||||
import glob
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
import sys
|
|
||||||
|
|
||||||
import xmltodict
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.models.company import (
|
from aki_prj23_transparenzregister.models.company import (
|
||||||
Capital,
|
Capital,
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
"""Transforms data from the Unternehmensregister v3 API to the data model of the Transparenzregister API."""
|
||||||
|
@ -1,34 +1,60 @@
|
|||||||
|
"""RoleMapper for Unternehmensregister v3 API."""
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import xmltodict
|
import xmltodict
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
from aki_prj23_transparenzregister.models.company import RelationshipRoleEnum
|
from aki_prj23_transparenzregister.models.company import RelationshipRoleEnum
|
||||||
|
|
||||||
|
|
||||||
class RoleMapper:
|
class RoleMapper:
|
||||||
|
"""RoleMapper for Unternehmensregister v3 API."""
|
||||||
|
|
||||||
singleton = None
|
singleton = None
|
||||||
def __init__(self):
|
|
||||||
# TODO Automated file retrieval
|
def __init__(self) -> None:
|
||||||
|
"""Initialize RoleMapper by ingesting XSD schema file."""
|
||||||
|
# TODO Automated file retrieval
|
||||||
base_path = os.path.dirname(Path(__file__))
|
base_path = os.path.dirname(Path(__file__))
|
||||||
path = os.path.join(base_path, "assets", "xjustiz_0040_cl_rollenbezeichnung_3_3.xsd")
|
path = os.path.join(
|
||||||
|
base_path, "assets", "xjustiz_0040_cl_rollenbezeichnung_3_3.xsd"
|
||||||
|
)
|
||||||
with open(path, encoding="utf-8") as file:
|
with open(path, encoding="utf-8") as file:
|
||||||
content = file.read()
|
content = file.read()
|
||||||
data = xmltodict.parse(content)
|
data = xmltodict.parse(content)
|
||||||
|
|
||||||
mapping = {}
|
mapping = {}
|
||||||
for entry in data["xs:schema"]["xs:simpleType"]["xs:restriction"]["xs:enumeration"]:
|
for entry in data["xs:schema"]["xs:simpleType"]["xs:restriction"][
|
||||||
mapping[entry['@value']] = entry['xs:annotation']['xs:appinfo']['wert']
|
"xs:enumeration"
|
||||||
|
]:
|
||||||
|
mapping[entry["@value"]] = entry["xs:annotation"]["xs:appinfo"]["wert"]
|
||||||
self.dictionary = mapping
|
self.dictionary = mapping
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def mapper():
|
def mapper() -> "RoleMapper":
|
||||||
|
"""Singleton getter for RoleMapper.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
RoleMapper: Singleton instance
|
||||||
|
"""
|
||||||
if RoleMapper.singleton is None:
|
if RoleMapper.singleton is None:
|
||||||
RoleMapper.singleton = RoleMapper()
|
RoleMapper.singleton = RoleMapper()
|
||||||
return RoleMapper.singleton
|
return RoleMapper.singleton
|
||||||
|
|
||||||
def get(self, key: str) -> RelationshipRoleEnum:
|
def get(self, key: str) -> RelationshipRoleEnum:
|
||||||
|
"""Get mapped value for given key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key (str): Key to map
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
RelationshipRoleEnum: Mapped value
|
||||||
|
"""
|
||||||
return RelationshipRoleEnum(self.dictionary[key])
|
return RelationshipRoleEnum(self.dictionary[key])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
mapper = RoleMapper()
|
mapper = RoleMapper()
|
||||||
print(mapper.get("201"))
|
logger.info(f"Mapped value for role 201 - {mapper.get('201')}")
|
||||||
|
@ -1,13 +1,8 @@
|
|||||||
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
|
"""Transform raw Unternehmensregister export (*.xml) to processed .json files for loading."""
|
||||||
import dataclasses
|
|
||||||
import glob
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import xmltodict
|
import re
|
||||||
from tqdm import tqdm
|
import typing
|
||||||
|
from collections.abc import Sequence
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.models.company import (
|
from aki_prj23_transparenzregister.models.company import (
|
||||||
Capital,
|
Capital,
|
||||||
@ -25,15 +20,14 @@ from aki_prj23_transparenzregister.models.company import (
|
|||||||
PersonToCompanyRelationship,
|
PersonToCompanyRelationship,
|
||||||
RelationshipRoleEnum,
|
RelationshipRoleEnum,
|
||||||
)
|
)
|
||||||
|
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import (
|
||||||
|
RoleMapper,
|
||||||
|
)
|
||||||
from aki_prj23_transparenzregister.utils.string_tools import (
|
from aki_prj23_transparenzregister.utils.string_tools import (
|
||||||
remove_traling_and_leading_quotes,
|
remove_traling_and_leading_quotes,
|
||||||
transform_date_to_iso,
|
transform_date_to_iso,
|
||||||
)
|
)
|
||||||
|
|
||||||
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3.role_mapper import (
|
|
||||||
RoleMapper,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_date_of_birth(data: dict) -> str | None:
|
def parse_date_of_birth(data: dict) -> str | None:
|
||||||
"""Retreives the date of birth from a stakeholder entry if possible.
|
"""Retreives the date of birth from a stakeholder entry if possible.
|
||||||
@ -56,6 +50,14 @@ def parse_date_of_birth(data: dict) -> str | None:
|
|||||||
|
|
||||||
|
|
||||||
def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
|
def map_role_id_to_enum(role_id: str) -> RelationshipRoleEnum:
|
||||||
|
"""Map Unternehmensregister role ID to RelationshipRoleEnum.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
role_id (str): Unternehmensregister role ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
RelationshipRoleEnum: Role enum
|
||||||
|
"""
|
||||||
mapper = RoleMapper.mapper()
|
mapper = RoleMapper.mapper()
|
||||||
return mapper.get(role_id)
|
return mapper.get(role_id)
|
||||||
|
|
||||||
@ -229,10 +231,7 @@ def loc_from_beteiligung(data: dict) -> Location:
|
|||||||
# "tns:anschrift",
|
# "tns:anschrift",
|
||||||
]
|
]
|
||||||
base = traversal(data, base_path)
|
base = traversal(data, base_path)
|
||||||
if "tns:anschrift" in base:
|
base = base["tns:anschrift"] if "tns:anschrift" in base else base["tns:sitz"]
|
||||||
base = base["tns:anschrift"]
|
|
||||||
else:
|
|
||||||
base = base["tns:sitz"]
|
|
||||||
|
|
||||||
if isinstance(base, list):
|
if isinstance(base, list):
|
||||||
base = base[0]
|
base = base[0]
|
||||||
@ -318,7 +317,9 @@ def map_rechtsform(company_name: str, data: dict) -> CompanyTypeEnum | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
def map_capital( # noqa: PLR0912
|
||||||
|
data: dict, company_type: CompanyTypeEnum
|
||||||
|
) -> Capital | None:
|
||||||
"""Extracts the company capital from the given Unternehmensregister export.
|
"""Extracts the company capital from the given Unternehmensregister export.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -332,7 +333,11 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
|
|||||||
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
|
if "tns:auswahl_zusatzangaben" not in data["tns:fachdatenRegister"]:
|
||||||
return None
|
return None
|
||||||
capital: dict = {"tns:zahl": 0.0, "tns:waehrung": {"code": None}}
|
capital: dict = {"tns:zahl": 0.0, "tns:waehrung": {"code": None}}
|
||||||
if company_type == CompanyTypeEnum.KG and "tns:personengesellschaft" in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]:
|
if (
|
||||||
|
company_type == CompanyTypeEnum.KG
|
||||||
|
and "tns:personengesellschaft"
|
||||||
|
in data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"]
|
||||||
|
):
|
||||||
capital_type = "Hafteinlage"
|
capital_type = "Hafteinlage"
|
||||||
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
|
base = data["tns:fachdatenRegister"]["tns:auswahl_zusatzangaben"][
|
||||||
"tns:personengesellschaft"
|
"tns:personengesellschaft"
|
||||||
@ -475,17 +480,40 @@ def map_founding_date(data: dict) -> str | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def traversal(data: dict, path: list[str | int]) -> any:
|
def traversal(data: dict, path: Sequence[str | int | object]) -> typing.Any:
|
||||||
|
"""Traverse a dict using list of keys.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
path (Sequence[str | int | object]): List of keys
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KeyError: If key not found
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
any: Value at the end of the path
|
||||||
|
"""
|
||||||
current = data
|
current = data
|
||||||
for key in path:
|
for key in path:
|
||||||
try:
|
try:
|
||||||
current = current[key]
|
current = current[key]
|
||||||
except:
|
except KeyError as e:
|
||||||
raise KeyError(f"Key {key} not found")
|
raise KeyError(f"Key {key} not found") from e
|
||||||
return current
|
return current
|
||||||
|
|
||||||
|
|
||||||
def map_hr_number(data: dict) -> str:
|
def map_hr_number(data: dict) -> str:
|
||||||
|
"""Extract the HR number from a given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KeyError: If key not found
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: HR number
|
||||||
|
"""
|
||||||
base = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
|
base = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:instanzdaten"][
|
||||||
"tns:aktenzeichen"
|
"tns:aktenzeichen"
|
||||||
]["tns:auswahl_aktenzeichen"]
|
]["tns:auswahl_aktenzeichen"]
|
||||||
@ -493,12 +521,20 @@ def map_hr_number(data: dict) -> str:
|
|||||||
hr_prefix = base["tns:aktenzeichen.strukturiert"]["tns:register"]["code"]
|
hr_prefix = base["tns:aktenzeichen.strukturiert"]["tns:register"]["code"]
|
||||||
hr_number = base["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
|
hr_number = base["tns:aktenzeichen.strukturiert"]["tns:laufendeNummer"]
|
||||||
return f"{hr_prefix} {hr_number}"
|
return f"{hr_prefix} {hr_number}"
|
||||||
elif "tns:aktenzeichen.freitext" in base:
|
if "tns:aktenzeichen.freitext" in base:
|
||||||
return base["tns:aktenzeichen.freitext"]
|
return base["tns:aktenzeichen.freitext"]
|
||||||
return hr_full
|
raise KeyError("Could not find HR number")
|
||||||
|
|
||||||
|
|
||||||
def map_district_court(data: dict) -> DistrictCourt:
|
def map_district_court(data: dict) -> DistrictCourt:
|
||||||
|
"""Extract the district court from a given Unternehmensregister export.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (dict): Data export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DistrictCourt: District court
|
||||||
|
"""
|
||||||
base_path = [
|
base_path = [
|
||||||
"tns:grunddaten",
|
"tns:grunddaten",
|
||||||
"tns:verfahrensdaten",
|
"tns:verfahrensdaten",
|
||||||
@ -525,11 +561,13 @@ def map_company_id(data: dict) -> CompanyID:
|
|||||||
CompanyID: ID of the company
|
CompanyID: ID of the company
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return CompanyID(
|
return CompanyID(map_hr_number(data), map_district_court(data)) # type: ignore
|
||||||
**{"hr_number": map_hr_number(data), "district_court": map_district_court(data)}
|
|
||||||
)
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][0]["tns:beteiligter"]["tns:auswahl_beteiligter"]["tns:organisation"]["tns:registereintragung"]["tns:registernummer"]
|
hr_number = data["tns:grunddaten"]["tns:verfahrensdaten"]["tns:beteiligung"][0][
|
||||||
|
"tns:beteiligter"
|
||||||
|
]["tns:auswahl_beteiligter"]["tns:organisation"]["tns:registereintragung"][
|
||||||
|
"tns:registernummer"
|
||||||
|
]
|
||||||
district_court = map_district_court(data)
|
district_court = map_district_court(data)
|
||||||
return CompanyID(hr_number=hr_number, district_court=district_court)
|
return CompanyID(hr_number=hr_number, district_court=district_court)
|
||||||
|
|
||||||
|
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user