"""Unternehmensregister Scraping.""" import glob import logging import multiprocessing import os from pathlib import Path from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.ui import WebDriverWait from tqdm import tqdm logger = logging.getLogger() def scrape(query: str, download_dir: list[str]): """Fetch results from Unternehmensregister for given query. Args: query (str): Search Query (RegEx supported) download_dir (list[str]): Directory to place output files in """ download_path = os.path.join(str(Path.cwd()), *download_dir) options = webdriver.ChromeOptions() preferences = { "profile.default_content_settings.popups": 0, "safebrowsing.enabled": True, "download": { "directory_upgrade": True, "prompt_for_download": False, "extensions_to_open": "", "default_directory": download_path, }, } options.add_argument("--headless=new") options.add_experimental_option("prefs", preferences) driver = webdriver.Chrome(options=options) driver.get("https://www.unternehmensregister.de/ureg/") # Accept Cookies driver.find_elements( By.XPATH, '//button[text()="Nur technisch notwendige Cookies akzeptieren"]' )[0].click() # Enter search query driver.find_elements(By.ID, "globalSearchForm:extendedResearchCompanyName")[ 0 ].send_keys(query) # Trigger search driver.find_elements(By.ID, "globalSearchForm:btnExecuteSearchOld")[0].click() # Wait for results wait = WebDriverWait(driver, 15) wait.until( lambda driver: driver.current_url != "https://www.unternehmensregister.de/ureg/" ) num_pages = int( driver.find_element(By.XPATH, '//*[@class="page_count"]').text.split(" ")[0] ) processed_companies = [] for _ in tqdm(range(num_pages)): # Find all "Registerinformationen" companies_tab = driver.find_elements( By.LINK_TEXT, "Registerinformationen des Registergerichts" ) company_names = [ elem.text for elem in driver.find_elements( By.XPATH, '//div[@class="company_result"]/span/b' ) ] for index, company_link in enumerate(companies_tab): company_name = company_names[index] if company_name in processed_companies: continue # Go to intermediary page company_link.click() # Trigger next redirect driver.find_element(By.LINK_TEXT, "Registerinformationen anzeigen").click() # Trigger SI download driver.find_element(By.LINK_TEXT, "SI").click() # Show shopping cart wait.until( ec.visibility_of_element_located( (By.LINK_TEXT, "Dokumentenkorb ansehen") ) ) driver.find_element(By.LINK_TEXT, "Dokumentenkorb ansehen").click() # Get document elems = driver.find_elements(By.TAG_NAME, "input") elems[-2].click() wait.until( ec.visibility_of_element_located((By.ID, "paymentFormOverview:btnNext")) ) driver.find_element(By.ID, "paymentFormOverview:btnNext").click() wait.until( ec.visibility_of_element_located((By.LINK_TEXT, "Zum Dokumentenkorb")) ) driver.find_element(By.LINK_TEXT, "Zum Dokumentenkorb").click() num_files = get_num_files(download_path) driver.find_element(By.CLASS_NAME, "download-wrapper").click() try: wait.until(wait_for_download_condition(download_path, num_files)) file_name = "".join(e for e in company_name if e.isalnum()) + ".xml" rename_latest_file( download_path, file_name, ) processed_companies.append(company_name) except Exception: logger.warning("Exception caught in Scraping") finally: for _ in range(6): driver.back() driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click() driver.close() def wait_for_download_condition( path: str, num_files: int, pattern: str = "*.xml" ) -> bool: """Selenium wait condition monitoring number of files in a dir. Args: path (str): Directory path num_files (int): Current number of file pattern (str, optional): File pattern. Defaults to "*.xml". Returns: bool: Current num file exceeded """ return len(glob.glob1(path, pattern)) > num_files def get_num_files(path: str, pattern: str = "*.xml") -> int: """Get number of files in directory. Args: path (str): Directory to scan pattern (str, optional): File pattern. Defaults to "*.xml". Returns: int: Number of files matching pattern """ return len(glob.glob1(path, pattern)) def rename_latest_file(path: str, filename: str, pattern: str = "*.xml"): """Rename file in dir with latest change date. Args: path (str): Dir to check filename (str): Name of file pattern (str, optional): File pattern. Defaults to "*.xml". """ list_of_files = [os.path.join(path, file) for file in glob.glob1(path, pattern)] latest_download = max(list_of_files, key=os.path.getctime) os.rename(latest_download, os.path.join(path, filename)) if __name__ == "__main__": """Main procedure""" import pandas as pd df_relevant_companies = pd.read_excel( "./data/study_id42887_top-100-unternehmen-deutschland.xlsx", sheet_name="Toplist", skiprows=1, ) df_relevant_companies = df_relevant_companies[df_relevant_companies["Name"].notna()] batch_size = 5 pool = multiprocessing.Pool(processes=batch_size) params = [ (query, ["data", "Unternehmensregister", "scraping", query.strip()]) for query in df_relevant_companies.Name ] # Map the process_handler function to the parameter list using the Pool pool.starmap(scrape, params) # Close the Pool to prevent any more tasks from being submitted pool.close() # Wait for all the processes to complete pool.join()