From ed681d7c47d06167c9e9712061c8aedb5f7f2967 Mon Sep 17 00:00:00 2001 From: TrisNol Date: Tue, 11 Jul 2023 14:20:16 +0200 Subject: [PATCH] refactor: Implement linter feedback --- .../API-tests/Unternehmensregister/main.py | 62 ++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/Jupyter/API-tests/Unternehmensregister/main.py b/Jupyter/API-tests/Unternehmensregister/main.py index bf47cdb..4d8e8c6 100644 --- a/Jupyter/API-tests/Unternehmensregister/main.py +++ b/Jupyter/API-tests/Unternehmensregister/main.py @@ -1,18 +1,27 @@ +"""Unternehmensregister Scraping.""" import glob +import logging import multiprocessing import os from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.ui import WebDriverWait from tqdm import tqdm +logger = logging.getLogger() + def scrape(query: str, download_dir: list[str]): + """Fetch results from Unternehmensregister for given query. + + Args: + query (str): Search Query (RegEx supported) + download_dir (list[str]): Directory to place output files in + """ download_path = os.path.join(str(Path.cwd()), *download_dir) - print(download_path) options = webdriver.ChromeOptions() preferences = { "profile.default_content_settings.popups": 0, @@ -52,7 +61,7 @@ def scrape(query: str, download_dir: list[str]): processed_companies = [] - for page_index in tqdm(range(num_pages)): + for _ in tqdm(range(num_pages)): # Find all "Registerinformationen" companies_tab = driver.find_elements( By.LINK_TEXT, "Registerinformationen des Registergerichts" @@ -75,7 +84,7 @@ def scrape(query: str, download_dir: list[str]): driver.find_element(By.LINK_TEXT, "SI").click() # Show shopping cart wait.until( - EC.visibility_of_element_located( + ec.visibility_of_element_located( (By.LINK_TEXT, "Dokumentenkorb ansehen") ) ) @@ -85,12 +94,12 @@ def scrape(query: str, download_dir: list[str]): elems[-2].click() wait.until( - EC.visibility_of_element_located((By.ID, "paymentFormOverview:btnNext")) + ec.visibility_of_element_located((By.ID, "paymentFormOverview:btnNext")) ) driver.find_element(By.ID, "paymentFormOverview:btnNext").click() wait.until( - EC.visibility_of_element_located((By.LINK_TEXT, "Zum Dokumentenkorb")) + ec.visibility_of_element_located((By.LINK_TEXT, "Zum Dokumentenkorb")) ) driver.find_element(By.LINK_TEXT, "Zum Dokumentenkorb").click() @@ -98,9 +107,7 @@ def scrape(query: str, download_dir: list[str]): driver.find_element(By.CLASS_NAME, "download-wrapper").click() try: - wait.until( - lambda x: wait_for_download_condition(download_path, num_files) - ) + wait.until(wait_for_download_condition(download_path, num_files)) file_name = "".join(e for e in company_name if e.isalnum()) + ".xml" rename_latest_file( download_path, @@ -108,9 +115,9 @@ def scrape(query: str, download_dir: list[str]): ) processed_companies.append(company_name) except Exception: - pass + logger.warning("Exception caught in Scraping") finally: - for click_counter in range(6): + for _ in range(6): driver.back() driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click() driver.close() @@ -119,34 +126,61 @@ def scrape(query: str, download_dir: list[str]): def wait_for_download_condition( path: str, num_files: int, pattern: str = "*.xml" ) -> bool: + """Selenium wait condition monitoring number of files in a dir. + + Args: + path (str): Directory path + num_files (int): Current number of file + pattern (str, optional): File pattern. Defaults to "*.xml". + + Returns: + bool: Current num file exceeded + """ return len(glob.glob1(path, pattern)) > num_files def get_num_files(path: str, pattern: str = "*.xml") -> int: + """Get number of files in directory. + + Args: + path (str): Directory to scan + pattern (str, optional): File pattern. Defaults to "*.xml". + + Returns: + int: Number of files matching pattern + """ return len(glob.glob1(path, pattern)) def rename_latest_file(path: str, filename: str, pattern: str = "*.xml"): + """Rename file in dir with latest change date. + + Args: + path (str): Dir to check + filename (str): Name of file + pattern (str, optional): File pattern. Defaults to "*.xml". + """ list_of_files = [os.path.join(path, file) for file in glob.glob1(path, pattern)] latest_download = max(list_of_files, key=os.path.getctime) os.rename(latest_download, os.path.join(path, filename)) if __name__ == "__main__": + """Main procedure""" import pandas as pd - df = pd.read_excel( + df_relevant_companies = pd.read_excel( "./data/study_id42887_top-100-unternehmen-deutschland.xlsx", sheet_name="Toplist", skiprows=1, ) - df = df[df["Name"].notna()] + df_relevant_companies = df_relevant_companies[df_relevant_companies["Name"].notna()] batch_size = 5 pool = multiprocessing.Pool(processes=batch_size) params = [ (query, ["data", "Unternehmensregister", "scraping", query.strip()]) - for query in df.Name + for query in df_relevant_companies.Name ] # Map the process_handler function to the parameter list using the Pool pool.starmap(scrape, params)