checkpoint: Refactoring, first working version of processing

This commit is contained in:
TrisNol 2023-11-03 22:39:27 +01:00
parent 2458ad98ff
commit 042a019628
8 changed files with 132 additions and 116 deletions

View File

@ -5,6 +5,7 @@ import glob
import argparse import argparse
import tempfile import tempfile
import dataclasses import dataclasses
import multiprocessing
import pandas as pd import pandas as pd
from tqdm import tqdm from tqdm import tqdm
from pathlib import Path from pathlib import Path
@ -21,11 +22,76 @@ from aki_prj23_transparenzregister.utils.logger_config import (
from aki_prj23_transparenzregister.utils.sql import connector from aki_prj23_transparenzregister.utils.sql import connector
from aki_prj23_transparenzregister.utils.sql import entities from aki_prj23_transparenzregister.utils.sql import entities
from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService,
)
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import ( from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister import (
extract, extract,
load, load,
) )
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import main as transform from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform import (
main as transform,
)
def work(company: entities.Company, configProvider) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
xml_dir = os.path.join(*[tmp_dir, "xml"])
os.makedirs(xml_dir, exist_ok=True)
try:
extract.scrape(company.name, xml_dir, True)
except Exception as e:
logger.error(e)
return
output_path = os.path.join(*[tmp_dir, "transformed"])
os.makedirs(output_path, exist_ok=True)
json_dir = os.path.join(*[tmp_dir, "json"])
os.makedirs(json_dir, exist_ok=True)
transform.transform_xml_to_json(
xml_dir,
json_dir,
)
for file in tqdm(glob.glob1(json_dir, "*.json")):
try:
path = os.path.join(json_dir, file)
with open(path, encoding="utf-8") as file_object:
company_mapped = transform.map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company_mapped.name if e.isalnum())[:50]
with open(
os.path.join(output_path, f"{name}.json"),
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company_mapped), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
return
mongoConnector = MongoConnector(configProvider.get_mongo_connection_string())
companyMongoService = CompanyMongoService(
mongoConnector
)
num_processed = load.load_directory_to_mongo(output_path, companyMongoService)
mongoConnector.client.close()
try:
if num_processed > 0:
with connector.get_session(configProvider) as session:
company = session.query(entities.MissingCompany).where(entities.MissingCompany.name == company.name).first()
company.searched_for = True
session.commit()
print(f"Processed {company.name}")
except Exception as e:
logger.error(e)
return
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -43,44 +109,29 @@ if __name__ == "__main__":
parsed = parser.parse_args(sys.argv[1:]) parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed) configer_logger(namespace=parsed)
config = parsed.config config = parsed.config
# session = connector.get_session(get_config_provider(config)) configProvider = get_config_provider(config)
# missing_companies = session.query(entities.MissingCompany).all() session = connector.get_session(configProvider)
counter = 0 companyMongoService = CompanyMongoService(
# # Scrape data from unternehmensregister MongoConnector(configProvider.get_mongo_connection_string())
# for company in missing_companies:
# print(company.name)
# extract.scrape(company.name, ["tmp", "xml"])
# Transform input
output_path = os.path.join(str(Path.cwd()), *["tmp", "transformed"])
xml_dir = os.path.join(str(Path.cwd()), *["tmp", "xml"])
json_dir = os.path.join(str(Path.cwd()), *["tmp", "json"])
transform.transform_xml_to_json(
os.path.join(xml_dir),
os.path.join(json_dir),
) )
for file in tqdm(glob.glob1(json_dir, "*.json")):
path = os.path.join(json_dir, file)
with open(path, encoding="utf-8") as file_object:
# try:
print(path)
company = transform.map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company.name if e.isalnum())[:50] missing_companies = session.query(entities.MissingCompany).where(entities.MissingCompany.searched_for == False).all()
with open( batch_size = 5
f"{output_path}/{name}.json", pool = multiprocessing.Pool(processes=batch_size)
"w+", # Scrape data from unternehmensregister
encoding="utf-8", params = [
) as export_file: (company, configProvider)
json.dump( for company in missing_companies
dataclasses.asdict(company), export_file, ensure_ascii=False ]
) # Map the process_handler function to the parameter list using the Pool
# except Exception as e: pool.starmap(work, params)
# logger.error(e.with_traceback())
# logger.error(e) # Close the Pool to prevent any more tasks from being submitted
# logger.error(f"Error in processing {path}") pool.close()
# sys.exit(1)
# Wait for all the processes to complete
pool.join()
# for company in tqdm(missing_companies):

View File

@ -13,15 +13,15 @@ from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm from tqdm import tqdm
def scrape(query: str, download_dir: list[str]) -> None: def scrape(query: str, download_dir: str, full_match: bool = False) -> None:
"""Fetch results from Unternehmensregister for given query. """Fetch results from Unternehmensregister for given query.
Args: Args:
query (str): Search Query (RegEx supported) query (str): Search Query (RegEx supported)
download_dir (list[str]): Directory to place output files in download_dir (list[str]): Directory to place output files in
""" """
download_path = os.path.join(str(Path.cwd()), *download_dir) # download_path = os.path.join(str(Path.cwd()), *download_dir)
print(download_path) download_path = download_dir
options = webdriver.ChromeOptions() options = webdriver.ChromeOptions()
preferences = { preferences = {
"profile.default_content_settings.popups": 0, "profile.default_content_settings.popups": 0,
@ -33,8 +33,9 @@ def scrape(query: str, download_dir: list[str]) -> None:
"default_directory": download_path, "default_directory": download_path,
}, },
} }
# options.add_argument("--headless=new") options.add_argument("--headless=new")
options.add_experimental_option("prefs", preferences) options.add_experimental_option("prefs", preferences)
options.add_experimental_option("excludeSwitches", ["enable-logging"])
driver = webdriver.Chrome(options=options) driver = webdriver.Chrome(options=options)
@ -74,7 +75,7 @@ def scrape(query: str, download_dir: list[str]) -> None:
] ]
for index, company_link in enumerate(companies_tab): for index, company_link in enumerate(companies_tab):
company_name = company_names[index] company_name = company_names[index]
if company_name in processed_companies: if company_name in processed_companies or (full_match == True and company_name != query):
continue continue
# Go to intermediary page # Go to intermediary page
company_link.click() company_link.click()
@ -121,6 +122,8 @@ def scrape(query: str, download_dir: list[str]) -> None:
finally: finally:
for _ in range(6): for _ in range(6):
driver.back() driver.back()
if company_name == query and full_match == True:
break
driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click() driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click()
driver.close() driver.close()

View File

@ -14,17 +14,22 @@ from aki_prj23_transparenzregister.utils.mongo.connector import (
MongoConnector, MongoConnector,
) )
def load_directory_to_mongo(base_path: str, service: CompanyMongoService) -> int:
num_processed = 0
for file in tqdm(glob.glob1(base_path, "*.json")):
path = os.path.join(base_path, file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.migrations_of_base_data(company)
num_processed += 1
return num_processed
if __name__ == "__main__": if __name__ == "__main__":
provider = JsonFileConfigProvider("secrets.json") provider = JsonFileConfigProvider("secrets.json")
conn_string = provider.get_mongo_connection_string() conn_string = provider.get_mongo_connection_string()
connector = MongoConnector(conn_string) connector = MongoConnector(conn_string)
service = CompanyMongoService(connector) service = CompanyMongoService(connector)
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister" load_directory_to_mongo("./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister/transformed", service)
for file in tqdm(glob.glob1(f"{base_path}/transformed", "*.json")):
path = os.path.join(f"{base_path}/transformed", file)
with open(path, encoding="utf-8") as file_object:
data = json.loads(file_object.read())
company: Company = Company(**data)
service.migrations_of_base_data(company)

View File

@ -8,6 +8,7 @@ import sys
import xmltodict import xmltodict
from tqdm import tqdm from tqdm import tqdm
from loguru import logger
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import v1 from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v1 import v1
from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import v3 from aki_prj23_transparenzregister.utils.data_extraction.unternehmensregister.transform.v3 import v3
@ -26,12 +27,14 @@ def transform_xml_to_json(source_dir: str, target_dir: str) -> None:
target_path = os.path.join( target_path = os.path.join(
target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json") target_dir, source_path.split(os.sep)[-1].replace(".xml", ".json")
) )
try:
with open(source_path, encoding="utf-8") as source_file: with open(source_path, encoding="utf-8") as source_file:
# deepcode ignore HandleUnicode: Weird XML format no other solution # deepcode ignore HandleUnicode: Weird XML format no other solution
data = xmltodict.parse(source_file.read().encode()) data = xmltodict.parse(source_file.read().encode())
with open(target_path, "w", encoding="utf-8") as json_file: with open(target_path, "w", encoding="utf-8") as json_file:
json_file.write(json.dumps(data)) json_file.write(json.dumps(data))
except Exception as e:
logger.error(e)
def determine_version(data: dict): def determine_version(data: dict):
if "XJustiz_Daten" in data: if "XJustiz_Daten" in data:

View File

@ -539,31 +539,3 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["relationships"].append(people) result["relationships"].append(people)
result = map_co_relation(result) result = map_co_relation(result)
return Company(**result) return Company(**result)
if __name__ == "__main__":
from loguru import logger
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
company: Company = map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
logger.error(f"Error in processing {path}")
sys.exit(1)

View File

@ -385,6 +385,8 @@ def map_capital(data: dict, company_type: CompanyTypeEnum) -> Capital | None:
]: ]:
return None return None
# Catch entries having the dict but with null values # Catch entries having the dict but with null values
if isinstance(capital, list):
capital = capital[0]
if not all(capital.values()): if not all(capital.values()):
return None return None
return Capital( return Capital(
@ -465,9 +467,10 @@ def map_founding_date(data: dict) -> str | None:
"tns:fachdatenRegister", "tns:fachdatenRegister",
"tns:basisdatenRegister", "tns:basisdatenRegister",
"tns:satzungsdatum", "tns:satzungsdatum",
"tns:aktuellesSatzungsdatum",
] ]
return traversal(data, path) base = traversal(data, path)
if "tns:aktuellesSatzungsdatum" in base:
return base["tns:aktuellesSatzungsdatum"]
# No reliable answer # No reliable answer
return None return None
@ -620,31 +623,3 @@ def map_unternehmensregister_json(data: dict) -> Company:
result["relationships"].append(people) result["relationships"].append(people)
result = map_co_relation(result) result = map_co_relation(result)
return Company(**result) return Company(**result)
if __name__ == "__main__":
from loguru import logger
base_path = "./Jupyter/API-tests/Unternehmensregister/data/Unternehmensregister"
for file in tqdm(glob.glob1(f"{base_path}/export", "*.json")):
path = os.path.join(f"{base_path}/export", file)
with open(path, encoding="utf-8") as file_object:
try:
company: Company = map_unternehmensregister_json(
json.loads(file_object.read())
)
name = "".join(e for e in company.name if e.isalnum())[:50]
with open(
f"{base_path}/transformed/{name}.json",
"w+",
encoding="utf-8",
) as export_file:
json.dump(
dataclasses.asdict(company), export_file, ensure_ascii=False
)
except Exception as e:
logger.error(e)
logger.error(f"Error in processing {path}")
sys.exit(1)

View File

@ -45,7 +45,6 @@ class CompanyMongoService:
query = { query = {
"id.hr_number": id["hr_number"], "id.hr_number": id["hr_number"],
"id.district_court.name": id["district_court"]["name"], "id.district_court.name": id["district_court"]["name"],
"id.district_court.city": id["district_court"]["city"],
} }
with self.lock: with self.lock:
result = list(self.collection.find(query)) result = list(self.collection.find(query))

View File

@ -1099,6 +1099,14 @@
"execution_count": 19, "execution_count": 19,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
},
{
"ename": "",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[1;31mThe Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click <a href='https://aka.ms/vscodeJupyterKernelCrash'>here</a> for more info. View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details."
]
} }
], ],
"source": [ "source": [