mirror of
https://github.com/fhswf/aki_prj23_transparenzregister.git
synced 2025-05-13 06:58:46 +02:00
159 lines
5.3 KiB
Python
159 lines
5.3 KiB
Python
import glob
|
|
import multiprocessing
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from tqdm import tqdm
|
|
|
|
|
|
def scrape(query: str, download_dir: list[str]):
|
|
download_path = os.path.join(str(Path.cwd()), *download_dir)
|
|
print(download_path)
|
|
options = webdriver.ChromeOptions()
|
|
preferences = {
|
|
"profile.default_content_settings.popups": 0,
|
|
"safebrowsing.enabled": True,
|
|
"download": {
|
|
"directory_upgrade": True,
|
|
"prompt_for_download": False,
|
|
"extensions_to_open": "",
|
|
"default_directory": download_path,
|
|
},
|
|
}
|
|
options.add_argument("--headless=new")
|
|
options.add_experimental_option("prefs", preferences)
|
|
|
|
driver = webdriver.Chrome(options=options)
|
|
|
|
driver.get("https://www.unternehmensregister.de/ureg/")
|
|
# Accept Cookies
|
|
driver.find_elements(
|
|
By.XPATH, '//button[text()="Nur technisch notwendige Cookies akzeptieren"]'
|
|
)[0].click()
|
|
# Enter search query
|
|
driver.find_elements(By.ID, "globalSearchForm:extendedResearchCompanyName")[
|
|
0
|
|
].send_keys(query)
|
|
# Trigger search
|
|
driver.find_elements(By.ID, "globalSearchForm:btnExecuteSearchOld")[0].click()
|
|
# Wait for results
|
|
wait = WebDriverWait(driver, 15)
|
|
wait.until(
|
|
lambda driver: driver.current_url != "https://www.unternehmensregister.de/ureg/"
|
|
)
|
|
|
|
num_pages = int(
|
|
driver.find_element(By.XPATH, '//*[@class="page_count"]').text.split(" ")[0]
|
|
)
|
|
|
|
processed_companies = []
|
|
|
|
for page_index in tqdm(range(num_pages)):
|
|
# Find all "Registerinformationen"
|
|
companies_tab = driver.find_elements(
|
|
By.LINK_TEXT, "Registerinformationen des Registergerichts"
|
|
)
|
|
company_names = [
|
|
elem.text
|
|
for elem in driver.find_elements(
|
|
By.XPATH, '//div[@class="company_result"]/span/b'
|
|
)
|
|
]
|
|
for index, company_link in enumerate(companies_tab):
|
|
company_name = company_names[index]
|
|
if company_name in processed_companies:
|
|
continue
|
|
# Go to intermediary page
|
|
company_link.click()
|
|
# Trigger next redirect
|
|
driver.find_element(By.LINK_TEXT, "Registerinformationen anzeigen").click()
|
|
# Trigger SI download
|
|
driver.find_element(By.LINK_TEXT, "SI").click()
|
|
# Show shopping cart
|
|
wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.LINK_TEXT, "Dokumentenkorb ansehen")
|
|
)
|
|
)
|
|
driver.find_element(By.LINK_TEXT, "Dokumentenkorb ansehen").click()
|
|
# Get document
|
|
elems = driver.find_elements(By.TAG_NAME, "input")
|
|
elems[-2].click()
|
|
|
|
wait.until(
|
|
EC.visibility_of_element_located((By.ID, "paymentFormOverview:btnNext"))
|
|
)
|
|
driver.find_element(By.ID, "paymentFormOverview:btnNext").click()
|
|
|
|
wait.until(
|
|
EC.visibility_of_element_located((By.LINK_TEXT, "Zum Dokumentenkorb"))
|
|
)
|
|
driver.find_element(By.LINK_TEXT, "Zum Dokumentenkorb").click()
|
|
|
|
num_files = get_num_files(download_path)
|
|
driver.find_element(By.CLASS_NAME, "download-wrapper").click()
|
|
|
|
try:
|
|
wait.until(
|
|
lambda x: wait_for_download_condition(download_path, num_files)
|
|
)
|
|
file_name = "".join(e for e in company_name if e.isalnum()) + ".xml"
|
|
rename_latest_file(
|
|
download_path,
|
|
file_name,
|
|
)
|
|
processed_companies.append(company_name)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
for click_counter in range(6):
|
|
driver.back()
|
|
driver.find_element(By.XPATH, '//*[@class="fas fa-angle-right"]').click()
|
|
driver.close()
|
|
|
|
|
|
def wait_for_download_condition(
|
|
path: str, num_files: int, pattern: str = "*.xml"
|
|
) -> bool:
|
|
return len(glob.glob1(path, pattern)) > num_files
|
|
|
|
|
|
def get_num_files(path: str, pattern: str = "*.xml") -> int:
|
|
return len(glob.glob1(path, pattern))
|
|
|
|
|
|
def rename_latest_file(path: str, filename: str, pattern: str = "*.xml"):
|
|
list_of_files = [os.path.join(path, file) for file in glob.glob1(path, pattern)]
|
|
latest_download = max(list_of_files, key=os.path.getctime)
|
|
os.rename(latest_download, os.path.join(path, filename))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import pandas as pd
|
|
|
|
df = pd.read_excel(
|
|
"./data/study_id42887_top-100-unternehmen-deutschland.xlsx",
|
|
sheet_name="Toplist",
|
|
skiprows=1,
|
|
)
|
|
df = df[df["Name"].notna()]
|
|
|
|
batch_size = 5
|
|
pool = multiprocessing.Pool(processes=batch_size)
|
|
params = [
|
|
(query, ["data", "Unternehmensregister", "scraping", query.strip()])
|
|
for query in df.Name
|
|
]
|
|
# Map the process_handler function to the parameter list using the Pool
|
|
pool.starmap(scrape, params)
|
|
|
|
# Close the Pool to prevent any more tasks from being submitted
|
|
pool.close()
|
|
|
|
# Wait for all the processes to complete
|
|
pool.join()
|