From 042a0196281a191ce2f0001896461a07c9493e3b Mon Sep 17 00:00:00 2001 From: TrisNol Date: Fri, 3 Nov 2023 22:39:27 +0100 Subject: [PATCH] checkpoint: Refactoring, first working version of processing --- .../apps/find_missing_companies.py | 127 ++++++++++++------ .../unternehmensregister/extract.py | 13 +- .../unternehmensregister/load.py | 21 +-- .../unternehmensregister/transform/main.py | 15 ++- .../unternehmensregister/transform/v1/v1.py | 28 ---- .../unternehmensregister/transform/v3/v3.py | 35 +---- .../utils/mongo/company_mongo_service.py | 1 - tmp/transformation.ipynb | 8 ++ 8 files changed, 132 insertions(+), 116 deletions(-) diff --git a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py index f8a7b5b..3f0f941 100644 --- a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py +++ b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py @@ -5,6 +5,7 @@ import glob import argparse import tempfile import dataclasses +import multiprocessing import pandas as pd from tqdm import tqdm from pathlib import Path @@ -21,11 +22,76 @@ from aki_prj23_transparenzregister.utils.logger_config import ( from aki_prj23_transparenzregister.utils.sql import connector from aki_prj23_transparenzregister.utils.sql import entities +from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector +from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import ( + CompanyMongoService, +) + from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import ( extract, load, ) -from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import main as transform +from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import ( + main as transform, +) + +def work(company: entities.Company, configProvider) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + xml_dir = os.path.join(*[tmp_dir, "xml"]) + os.makedirs(xml_dir, exist_ok=True) + try: + extract.scrape(company.name, xml_dir, True) + except Exception as e: + logger.error(e) + return + output_path = os.path.join(*[tmp_dir, "transformed"]) + os.makedirs(output_path, exist_ok=True) + json_dir = os.path.join(*[tmp_dir, "json"]) + os.makedirs(json_dir, exist_ok=True) + transform.transform_xml_to_json( + xml_dir, + json_dir, + ) + + for file in tqdm(glob.glob1(json_dir, "*.json")): + try: + path = os.path.join(json_dir, file) + with open(path, encoding="utf-8") as file_object: + company_mapped = transform.map_unternehmensregister_json( + json.loads(file_object.read()) + ) + + name = "".join(e for e in company_mapped.name if e.isalnum())[:50] + + with open( + os.path.join(output_path, f"{name}.json"), + "w+", + encoding="utf-8", + ) as export_file: + json.dump( + dataclasses.asdict(company_mapped), export_file, ensure_ascii=False + ) + except Exception as e: + logger.error(e) + return + mongoConnector = MongoConnector(configProvider.get_mongo_connection_string()) + companyMongoService = CompanyMongoService( + mongoConnector + ) + num_processed = load.load_directory_to_mongo(output_path, companyMongoService) + mongoConnector.client.close() + + try: + if num_processed > 0: + with connector.get_session(configProvider) as session: + company = session.query(entities.MissingCompany).where(entities.MissingCompany.name == company.name).first() + company.searched_for = True + session.commit() + print(f"Processed {company.name}") + except Exception as e: + logger.error(e) + return + if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -43,44 +109,29 @@ if __name__ == "__main__": parsed = parser.parse_args(sys.argv[1:]) configer_logger(namespace=parsed) config = parsed.config - # session = connector.get_session(get_config_provider(config)) - # missing_companies = session.query(entities.MissingCompany).all() + configProvider = get_config_provider(config) + session = connector.get_session(configProvider) - counter = 0 - # # Scrape data from unternehmensregister - # for company in missing_companies: - # print(company.name) - # extract.scrape(company.name, ["tmp", "xml"]) - - # Transform input - output_path = os.path.join(str(Path.cwd()), *["tmp", "transformed"]) - xml_dir = os.path.join(str(Path.cwd()), *["tmp", "xml"]) - json_dir = os.path.join(str(Path.cwd()), *["tmp", "json"]) - transform.transform_xml_to_json( - os.path.join(xml_dir), - os.path.join(json_dir), + companyMongoService = CompanyMongoService( + MongoConnector(configProvider.get_mongo_connection_string()) ) - for file in tqdm(glob.glob1(json_dir, "*.json")): - path = os.path.join(json_dir, file) - with open(path, encoding="utf-8") as file_object: - # try: - print(path) - company = transform.map_unternehmensregister_json( - json.loads(file_object.read()) - ) - name = "".join(e for e in company.name if e.isalnum())[:50] + missing_companies = session.query(entities.MissingCompany).where(entities.MissingCompany.searched_for == False).all() - with open( - f"{output_path}/{name}.json", - "w+", - encoding="utf-8", - ) as export_file: - json.dump( - dataclasses.asdict(company), export_file, ensure_ascii=False - ) - # except Exception as e: - # logger.error(e.with_traceback()) - # logger.error(e) - # logger.error(f"Error in processing {path}") - # sys.exit(1) \ No newline at end of file + batch_size = 5 + pool = multiprocessing.Pool(processes=batch_size) + # Scrape data from unternehmensregister + params = [ + (company, configProvider) + for company in missing_companies + ] + # Map the process_handler function to the parameter list using the Pool + pool.starmap(work, params) + + # Close the Pool to prevent any more tasks from being submitted + pool.close() + + # Wait for all the processes to complete + pool.join() + # for company in tqdm(missing_companies): + diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py index efff716..1343566 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py @@ -13,15 +13,15 @@ from selenium.webdriver.support.ui import WebDriverWait from tqdm import tqdm -def scrape(query: str, download_dir: list[str]) -> None: +def scrape(query: str, download_dir: str, full_match: bool = False) -> None: """Fetch results from Unternehmensregister for given query. Args: query (str): Search Query (RegEx supported) download_dir (list[str]): Directory to place output files in """ - download_path = os.path.join(str(Path.cwd()), *download_dir) - print(download_path) + # download_path = os.path.join(str(Path.cwd()), *download_dir) + download_path = download_dir options = webdriver.ChromeOptions() preferences = { "profile.default_content_settings.popups": 0, @@ -33,8 +33,9 @@ def scrape(query: str, download_dir: list[str]) -> None: "default_directory": download_path, }, } - # options.add_argument("--headless=new") + options.add_argument("--headless=new") options.add_experimental_option("prefs", preferences) + options.add_experimental_option("excludeSwitches", ["enable-logging"]) driver = webdriver.Chrome(options=options) @@ -74,7 +75,7 @@ def scrape(query: str, download_dir: list[str]) -> None: ] for index, company_link in enumerate(companies_tab): company_name = company_names[index] - if company_name in processed_companies: + if company_name in processed_companies or (full_match == True and company_name != query): continue # Go to intermediary page company_link.click() @@ -121,6 +122,8 @@ def scrape(query: str, download_dir: list[str]) -> None: finally: for _ in range(6): driver.back() + if company_name == query and full_match == True: + break driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click() driver.close() diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py index 621b723..4f58bf4 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py @@ -14,17 +14,22 @@ from aki_prj23_transparenzregister.utils.mongo.connector import ( MongoConnector, ) +def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int: + num_processed = 0 + for file in tqdm(glob.glob1(base_path, "*.json")): + path = os.path.join(base_path, file) + with open(path, encoding="utf-8") as file_object: + data = json.loads(file_object.read()) + company: Company = Company(**data) + + service.migrations_of_base_data(company) + num_processed += 1 + return num_processed + if __name__ == "__main__": provider = JsonFileConfigProvider("secrets.json") conn_string = provider.get_mongo_connection_string() connector = MongoConnector(conn_string) service = CompanyMongoService(connector) - base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" - for file in tqdm(glob.glob1(f"{base_path}/transformed", "*.json")): - path = os.path.join(f"{base_path}/transformed", file) - with open(path, encoding="utf-8") as file_object: - data = json.loads(file_object.read()) - company: Company = Company(**data) - - service.migrations_of_base_data(company) + load_directory_to_mongo("./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed", service) \ No newline at end of file diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py index 717c4d1..9025029 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/main.py @@ -8,6 +8,7 @@ import sys import xmltodict from tqdm import tqdm +from loguru import logger from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import v1 from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import v3 @@ -26,12 +27,14 @@ def transform_xml_to_json(source_dir: str, target_dir: str) -> None: target_path = os.path.join( target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json") ) - - with open(source_path, encoding="utf-8") as source_file: - # deepcode ignore HandleUnicode: Weird XML format no other solution - data = xmltodict.parse(source_file.read().encode()) - with open(target_path, "w", encoding="utf-8") as json_file: - json_file.write(json.dumps(data)) + try: + with open(source_path, encoding="utf-8") as source_file: + # deepcode ignore HandleUnicode: Weird XML format no other solution + data = xmltodict.parse(source_file.read().encode()) + with open(target_path, "w", encoding="utf-8") as json_file: + json_file.write(json.dumps(data)) + except Exception as e: + logger.error(e) def determine_version(data: dict): if "XJustiz_Daten" in data: diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py index 95405cb..92164fa 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v1/v1.py @@ -539,31 +539,3 @@ def map_unternehmensregister_json(data: dict) -> Company: result["relationships"].append(people) result = map_co_relation(result) return Company(**result) - - -if __name__ == "__main__": - from loguru import logger - - base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" - for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")): - path = os.path.join(f"{base_path}/export", file) - with open(path, encoding="utf-8") as file_object: - try: - company: Company = map_unternehmensregister_json( - json.loads(file_object.read()) - ) - - name = "".join(e for e in company.name if e.isalnum())[:50] - - with open( - f"{base_path}/transformed/{name}.json", - "w+", - encoding="utf-8", - ) as export_file: - json.dump( - dataclasses.asdict(company), export_file, ensure_ascii=False - ) - except Exception as e: - logger.error(e) - logger.error(f"Error in processing {path}") - sys.exit(1) \ No newline at end of file diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py index 61c9371..f66dc69 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform/v3/v3.py @@ -385,6 +385,8 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None: ]: return None # Catch entries having the dict but with null values + if isinstance(capital, list): + capital = capital[0] if not all(capital.values()): return None return Capital( @@ -465,9 +467,10 @@ def map_founding_date(data: dict) -> str | None: "tns:fachdatenRegister", "tns:basisdatenRegister", "tns:satzungsdatum", - "tns:aktuellesSatzungsdatum", ] - return traversal(data, path) + base = traversal(data, path) + if "tns:aktuellesSatzungsdatum" in base: + return base["tns:aktuellesSatzungsdatum"] # No reliable answer return None @@ -620,31 +623,3 @@ def map_unternehmensregister_json(data: dict) -> Company: result["relationships"].append(people) result = map_co_relation(result) return Company(**result) - - -if __name__ == "__main__": - from loguru import logger - - base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" - for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")): - path = os.path.join(f"{base_path}/export", file) - with open(path, encoding="utf-8") as file_object: - try: - company: Company = map_unternehmensregister_json( - json.loads(file_object.read()) - ) - - name = "".join(e for e in company.name if e.isalnum())[:50] - - with open( - f"{base_path}/transformed/{name}.json", - "w+", - encoding="utf-8", - ) as export_file: - json.dump( - dataclasses.asdict(company), export_file, ensure_ascii=False - ) - except Exception as e: - logger.error(e) - logger.error(f"Error in processing {path}") - sys.exit(1) diff --git a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py index d175be2..51c1309 100644 --- a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py +++ b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py @@ -45,7 +45,6 @@ class CompanyMongoService: query = { "id.hr_number": id["hr_number"], "id.district_court.name": id["district_court"]["name"], - "id.district_court.city": id["district_court"]["city"], } with self.lock: result = list(self.collection.find(query)) diff --git a/tmp/transformation.ipynb b/tmp/transformation.ipynb index 62b0277..cd06c34 100644 --- a/tmp/transformation.ipynb +++ b/tmp/transformation.ipynb @@ -1099,6 +1099,14 @@ "execution_count": 19, "metadata": {}, "output_type": "execute_result" + }, + { + "ename": "", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[1;31mThe Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details." + ] } ], "source": [