diff --git a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py
index f8a7b5b..3f0f941 100644
--- a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py
+++ b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py
@@ -5,6 +5,7 @@ import glob
import argparse
import tempfile
import dataclasses
+import multiprocessing
import pandas as pd
from tqdm import tqdm
from pathlib import Path
@@ -21,11 +22,76 @@ from aki_prj23_transparenzregister.utils.logger_config import (
from aki_prj23_transparenzregister.utils.sql import connector
from aki_prj23_transparenzregister.utils.sql import entities
+from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
+from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
+ CompanyMongoService,
+)
+
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
extract,
load,
)
-from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import main as transform
+from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
+ main as transform,
+)
+
+def work(company: entities.Company, configProvider) -> None:
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ xml_dir = os.path.join(*[tmp_dir, "xml"])
+ os.makedirs(xml_dir, exist_ok=True)
+ try:
+ extract.scrape(company.name, xml_dir, True)
+ except Exception as e:
+ logger.error(e)
+ return
+ output_path = os.path.join(*[tmp_dir, "transformed"])
+ os.makedirs(output_path, exist_ok=True)
+ json_dir = os.path.join(*[tmp_dir, "json"])
+ os.makedirs(json_dir, exist_ok=True)
+ transform.transform_xml_to_json(
+ xml_dir,
+ json_dir,
+ )
+
+ for file in tqdm(glob.glob1(json_dir, "*.json")):
+ try:
+ path = os.path.join(json_dir, file)
+ with open(path, encoding="utf-8") as file_object:
+ company_mapped = transform.map_unternehmensregister_json(
+ json.loads(file_object.read())
+ )
+
+ name = "".join(e for e in company_mapped.name if e.isalnum())[:50]
+
+ with open(
+ os.path.join(output_path, f"{name}.json"),
+ "w+",
+ encoding="utf-8",
+ ) as export_file:
+ json.dump(
+ dataclasses.asdict(company_mapped), export_file, ensure_ascii=False
+ )
+ except Exception as e:
+ logger.error(e)
+ return
+ mongoConnector = MongoConnector(configProvider.get_mongo_connection_string())
+ companyMongoService = CompanyMongoService(
+ mongoConnector
+ )
+ num_processed = load.load_directory_to_mongo(output_path, companyMongoService)
+ mongoConnector.client.close()
+
+ try:
+ if num_processed > 0:
+ with connector.get_session(configProvider) as session:
+ company = session.query(entities.MissingCompany).where(entities.MissingCompany.name == company.name).first()
+ company.searched_for = True
+ session.commit()
+ print(f"Processed {company.name}")
+ except Exception as e:
+ logger.error(e)
+ return
+
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@@ -43,44 +109,29 @@ if __name__ == "__main__":
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
- # session = connector.get_session(get_config_provider(config))
- # missing_companies = session.query(entities.MissingCompany).all()
+ configProvider = get_config_provider(config)
+ session = connector.get_session(configProvider)
- counter = 0
- # # Scrape data from unternehmensregister
- # for company in missing_companies:
- # print(company.name)
- # extract.scrape(company.name, ["tmp", "xml"])
-
- # Transform input
- output_path = os.path.join(str(Path.cwd()), *["tmp", "transformed"])
- xml_dir = os.path.join(str(Path.cwd()), *["tmp", "xml"])
- json_dir = os.path.join(str(Path.cwd()), *["tmp", "json"])
- transform.transform_xml_to_json(
- os.path.join(xml_dir),
- os.path.join(json_dir),
+ companyMongoService = CompanyMongoService(
+ MongoConnector(configProvider.get_mongo_connection_string())
)
- for file in tqdm(glob.glob1(json_dir, "*.json")):
- path = os.path.join(json_dir, file)
- with open(path, encoding="utf-8") as file_object:
- # try:
- print(path)
- company = transform.map_unternehmensregister_json(
- json.loads(file_object.read())
- )
- name = "".join(e for e in company.name if e.isalnum())[:50]
+ missing_companies = session.query(entities.MissingCompany).where(entities.MissingCompany.searched_for == False).all()
- with open(
- f"{output_path}/{name}.json",
- "w+",
- encoding="utf-8",
- ) as export_file:
- json.dump(
- dataclasses.asdict(company), export_file, ensure_ascii=False
- )
- # except Exception as e:
- # logger.error(e.with_traceback())
- # logger.error(e)
- # logger.error(f"Error in processing {path}")
- # sys.exit(1)
\ No newline at end of file
+ batch_size = 5
+ pool = multiprocessing.Pool(processes=batch_size)
+ # Scrape data from unternehmensregister
+ params = [
+ (company, configProvider)
+ for company in missing_companies
+ ]
+ # Map the process_handler function to the parameter list using the Pool
+ pool.starmap(work, params)
+
+ # Close the Pool to prevent any more tasks from being submitted
+ pool.close()
+
+ # Wait for all the processes to complete
+ pool.join()
+ # for company in tqdm(missing_companies):
+
diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py
index efff716..1343566 100644
--- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py
+++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py
@@ -13,15 +13,15 @@ from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
-def scrape(query: str, download_dir: list[str]) -> None:
+def scrape(query: str, download_dir: str, full_match: bool = False) -> None:
"""Fetch results from Unternehmensregister for given query.
Args:
query (str): Search Query (RegEx supported)
download_dir (list[str]): Directory to place output files in
"""
- download_path = os.path.join(str(Path.cwd()), *download_dir)
- print(download_path)
+ # download_path = os.path.join(str(Path.cwd()), *download_dir)
+ download_path = download_dir
options = webdriver.ChromeOptions()
preferences = {
"profile.default_content_settings.popups": 0,
@@ -33,8 +33,9 @@ def scrape(query: str, download_dir: list[str]) -> None:
"default_directory": download_path,
},
}
- # options.add_argument("--headless=new")
+ options.add_argument("--headless=new")
options.add_experimental_option("prefs", preferences)
+ options.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(options=options)
@@ -74,7 +75,7 @@ def scrape(query: str, download_dir: list[str]) -> None:
]
for index, company_link in enumerate(companies_tab):
company_name = company_names[index]
- if company_name in processed_companies:
+ if company_name in processed_companies or (full_match == True and company_name != query):
continue
# Go to intermediary page
company_link.click()
@@ -121,6 +122,8 @@ def scrape(query: str, download_dir: list[str]) -> None:
finally:
for _ in range(6):
driver.back()
+ if company_name == query and full_match == True:
+ break
driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click()
driver.close()
diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py
index 621b723..4f58bf4 100644
--- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py
+++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py
@@ -14,17 +14,22 @@ from aki_prj23_transparenzregister.utils.mongo.connector import (
MongoConnector,
)
+def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int:
+ num_processed = 0
+ for file in tqdm(glob.glob1(base_path, "*.json")):
+ path = os.path.join(base_path, file)
+ with open(path, encoding="utf-8") as file_object:
+ data = json.loads(file_object.read())
+ company: Company = Company(**data)
+
+ service.migrations_of_base_data(company)
+ num_processed += 1
+ return num_processed
+
if __name__ == "__main__":
provider = JsonFileConfigProvider("secrets.json")
conn_string = provider.get_mongo_connection_string()
connector = MongoConnector(conn_string)
service = CompanyMongoService(connector)
- base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
- for file in tqdm(glob.glob1(f"{base_path}/transformed", "*.json")):
- path = os.path.join(f"{base_path}/transformed", file)
- with open(path, encoding="utf-8") as file_object:
- data = json.loads(file_object.read())
- company: Company = Company(**data)
-
- service.migrations_of_base_data(company)
+ load_directory_to_mongo("./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed", service)
\ No newline at end of file
diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py
index 717c4d1..9025029 100644
--- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py
+++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py
@@ -8,6 +8,7 @@ import sys
import xmltodict
from tqdm import tqdm
+from loguru import logger
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import v1
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import v3
@@ -26,12 +27,14 @@ def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
)
-
- with open(source_path, encoding="utf-8") as source_file:
- # deepcode ignore HandleUnicode: Weird XML format no other solution
- data = xmltodict.parse(source_file.read().encode())
- with open(target_path, "w", encoding="utf-8") as json_file:
- json_file.write(json.dumps(data))
+ try:
+ with open(source_path, encoding="utf-8") as source_file:
+ # deepcode ignore HandleUnicode: Weird XML format no other solution
+ data = xmltodict.parse(source_file.read().encode())
+ with open(target_path, "w", encoding="utf-8") as json_file:
+ json_file.write(json.dumps(data))
+ except Exception as e:
+ logger.error(e)
def determine_version(data: dict):
if "XJustiz_Daten" in data:
diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py
index 95405cb..92164fa 100644
--- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py
+++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py
@@ -539,31 +539,3 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)
-
-
-if __name__ == "__main__":
- from loguru import logger
-
- base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
- for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
- path = os.path.join(f"{base_path}/export", file)
- with open(path, encoding="utf-8") as file_object:
- try:
- company: Company = map_unternehmensregister_json(
- json.loads(file_object.read())
- )
-
- name = "".join(e for e in company.name if e.isalnum())[:50]
-
- with open(
- f"{base_path}/transformed/{name}.json",
- "w+",
- encoding="utf-8",
- ) as export_file:
- json.dump(
- dataclasses.asdict(company), export_file, ensure_ascii=False
- )
- except Exception as e:
- logger.error(e)
- logger.error(f"Error in processing {path}")
- sys.exit(1)
\ No newline at end of file
diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py
index 61c9371..f66dc69 100644
--- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py
+++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py
@@ -385,6 +385,8 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
]:
return None
# Catch entries having the dict but with null values
+ if isinstance(capital, list):
+ capital = capital[0]
if not all(capital.values()):
return None
return Capital(
@@ -465,9 +467,10 @@ def map_founding_date(data: dict) -> str | None:
"tns:fachdatenRegister",
"tns:basisdatenRegister",
"tns:satzungsdatum",
- "tns:aktuellesSatzungsdatum",
]
- return traversal(data, path)
+ base = traversal(data, path)
+ if "tns:aktuellesSatzungsdatum" in base:
+ return base["tns:aktuellesSatzungsdatum"]
# No reliable answer
return None
@@ -620,31 +623,3 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["relationships"].append(people)
result = map_co_relation(result)
return Company(**result)
-
-
-if __name__ == "__main__":
- from loguru import logger
-
- base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
- for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
- path = os.path.join(f"{base_path}/export", file)
- with open(path, encoding="utf-8") as file_object:
- try:
- company: Company = map_unternehmensregister_json(
- json.loads(file_object.read())
- )
-
- name = "".join(e for e in company.name if e.isalnum())[:50]
-
- with open(
- f"{base_path}/transformed/{name}.json",
- "w+",
- encoding="utf-8",
- ) as export_file:
- json.dump(
- dataclasses.asdict(company), export_file, ensure_ascii=False
- )
- except Exception as e:
- logger.error(e)
- logger.error(f"Error in processing {path}")
- sys.exit(1)
diff --git a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py
index d175be2..51c1309 100644
--- a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py
+++ b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py
@@ -45,7 +45,6 @@ class CompanyMongoService:
query = {
"id.hr_number": id["hr_number"],
"id.district_court.name": id["district_court"]["name"],
- "id.district_court.city": id["district_court"]["city"],
}
with self.lock:
result = list(self.collection.find(query))
diff --git a/tmp/transformation.ipynb b/tmp/transformation.ipynb
index 62b0277..cd06c34 100644
--- a/tmp/transformation.ipynb
+++ b/tmp/transformation.ipynb
@@ -1099,6 +1099,14 @@
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
+ },
+ {
+ "ename": "",
+ "evalue": "",
+ "output_type": "error",
+ "traceback": [
+ "\u001b[1;31mThe Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details."
+ ]
}
],
"source": [