From 92d14b58240060e9182b7cd75c5e2c695d661934 Mon Sep 17 00:00:00 2001 From: Tristan Nolde Date: Mon, 4 Dec 2023 19:05:52 +0100 Subject: [PATCH] Ingest schedule (#391) Includes a new "app" running the ingestion jobs (aka fetch_news and find_missing_companies + enrich_company_financials) on a schedule. This also fixes an issue with the previous schedule implementation by persisting the schedule in a file that survives new deployment and continues where it left off. --- Dockerfile | 2 +- local-docker-compose.yml | 4 + pyproject.toml | 4 +- .../apps/enrich_company_financials.py | 92 ++++++++++----- .../apps/fetch_news.py | 6 +- .../apps/find_missing_companies.py | 99 ++++++++++------- .../apps/ingest.py | 105 ++++++++++++++++++ .../unternehmensregister/extract.py | 3 +- tests/apps/fetch_news_test.py | 6 +- tests/apps/ingest_test.py | 41 +++++++ 10 files changed, 280 insertions(+), 82 deletions(-) create mode 100644 src/aki_prj23_transparenzregister/apps/ingest.py create mode 100644 tests/apps/ingest_test.py diff --git a/Dockerfile b/Dockerfile index 96c04a3..c225d67 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,7 +38,7 @@ RUN dpkg -i google-chrome-stable_current_amd64.deb; apt-get -fy install RUN pip install --find-links=dist aki-prj23-transparenzregister[ingest] --no-cache-dir && \ rm dist/ -R -ENTRYPOINT ["fetch-news-schedule", "ENV"] +ENTRYPOINT ["ingest", "ENV"] CMD ["--level", "DEBUG"] FROM base as data-transformation diff --git a/local-docker-compose.yml b/local-docker-compose.yml index bdb4ad2..ed3d5ec 100644 --- a/local-docker-compose.yml +++ b/local-docker-compose.yml @@ -17,6 +17,9 @@ services: PYTHON_MONGO_PORT: ${PYTHON_MONGO_PORT:-27017} PYTHON_MONGO_DATABASE: ${PYTHON_MONGO_DATABASE:-transparenzregister} PYTHON_INGEST_SCHEDULE: ${PYTHON_INGEST_SCHEDULE:-4} + PYTHON_INGEST_SCHEDULE_FILE: ${PYTHON_INGEST_SCHEDULE_FILE:-/data/ingest_schedule.json} + volumes: + - ingest_data:/data mongodb: image: mongo:4.4.6 @@ -91,6 +94,7 @@ services: volumes: postgres_data: mongo_data: + ingest_data: networks: default: diff --git a/pyproject.toml b/pyproject.toml index 34d2049..503dd2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -143,7 +143,9 @@ pytest-repeat = "^0.9.1" copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli" data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli" data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli" -fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:fetch_news_cli" +fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:cli" +find-missing-companies = "aki_prj23_transparenzregister.apps.find_missing_companies:cli" +ingest = "aki_prj23_transparenzregister.apps.ingest:cli" reset-sql = "aki_prj23_transparenzregister.utils.sql.reset_sql:cli" webserver = "aki_prj23_transparenzregister.ui.app:main" diff --git a/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py b/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py index ce39892..8c9c4d7 100644 --- a/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py +++ b/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py @@ -1,16 +1,76 @@ """Add financial data to companies.""" +import argparse +import concurrent.futures +import sys import typing -from aki_prj23_transparenzregister.config.config_providers import JsonFileConfigProvider +from loguru import logger + +from aki_prj23_transparenzregister.config.config_providers import ( + ConfigProvider, + get_config_provider, +) from aki_prj23_transparenzregister.utils.data_extraction.bundesanzeiger import ( Bundesanzeiger, ) +from aki_prj23_transparenzregister.utils.logger_config import ( + add_logger_options_to_argparse, + configer_logger, +) from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import ( CompanyMongoService, ) from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector +def cli() -> None: # pragma: no cover + """CLI entry point.""" + parser = argparse.ArgumentParser( + prog="Transparenzregister enriching companies with mising financial data", + description="Filters all raw companies with missing financial info from the MongoDB and enriches them with yearly result data from the Bundesanzeiger.", + epilog="Example: enrich-company-financials --log-level ERROR --log-path print.log", + ) + parser.add_argument( + "config", + metavar="config", + default="ENV", + ) + add_logger_options_to_argparse(parser) + + parsed = parser.parse_args(sys.argv[1:]) + configer_logger(namespace=parsed) + config = parsed.config + config_provider = get_config_provider(config) + main(config_provider) + + +def main(config_provider: ConfigProvider) -> None: # pragma: no cover + """Main routine. + + Args: + config_provider (ConfigProvider): Configuration provider + """ + mongo_connector = MongoConnector(config_provider.get_mongo_connection_string()) + company_service = CompanyMongoService(mongo_connector) + + num_threads = 5 + companies = company_service.get_where_no_financial_results() + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + # Submit tasks for each entry in the list + future_to_entry = { + executor.submit(work, entry, company_service): entry for entry in companies + } + # Wait for all tasks to complete + for future in concurrent.futures.as_completed(future_to_entry): + entry = future_to_entry[future] + logger.info(f"Starting to process: {entry['name']}") + try: + # Get the result of the completed task (if needed) + future.result() + except Exception as e: + logger.error(f"Encountered error {e}") + + def work(company: typing.Any, company_service: CompanyMongoService) -> None: """Process company regarding financials. @@ -32,32 +92,4 @@ def work(company: typing.Any, company_service: CompanyMongoService) -> None: if __name__ == "__main__": - import concurrent.futures - - from loguru import logger - - config_provider = JsonFileConfigProvider("./secrets.json") - - mongo_connector = MongoConnector(config_provider.get_mongo_connection_string()) - company_service = CompanyMongoService(mongo_connector) - - num_threads = 25 - companies = company_service.get_where_no_financial_results() - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - # Submit tasks for each entry in the list - future_to_entry = { - executor.submit(work, entry, company_service): entry for entry in companies - } - - # with tqdm(total=len(companies)) as pbar: - # Wait for all tasks to complete - for future in concurrent.futures.as_completed(future_to_entry): - entry = future_to_entry[future] - logger.info(entry["name"]) - try: - # Get the result of the completed task (if needed) - result = future.result() - # pbar.set_description(entry["name"]) - # pbar.update(1) - except Exception as e: - logger.error(f"Error processing entry {e}") + cli() diff --git a/src/aki_prj23_transparenzregister/apps/fetch_news.py b/src/aki_prj23_transparenzregister/apps/fetch_news.py index 434420a..99f0fbc 100644 --- a/src/aki_prj23_transparenzregister/apps/fetch_news.py +++ b/src/aki_prj23_transparenzregister/apps/fetch_news.py @@ -28,7 +28,7 @@ from aki_prj23_transparenzregister.utils.mongo.news_mongo_service import ( ) -def fetch_news_cli() -> None: # pragma: no cover +def cli() -> None: # pragma: no cover """A cli interface to fetch latest news articles on a schedule.""" parser = argparse.ArgumentParser( prog="Fetch News on schedule", @@ -49,14 +49,14 @@ def fetch_news_cli() -> None: # pragma: no cover schedule = int(os.getenv("PYTHON_INGEST_SCHEDULE", "6")) logger.info(f"Scheduled job will run every {schedule} hours") - every(schedule).hours.do(schedule, config_provider) + every(schedule).hours.do(main, config_provider) while True: run_pending() time.sleep(1) -def schedule(config_provider: ConfigProvider) -> int: +def main(config_provider: ConfigProvider) -> int: """Scheduled job to fetch news articles and transfer them to MongoDB. Args: diff --git a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py index 29ec97d..7b0ccae 100644 --- a/src/aki_prj23_transparenzregister/apps/find_missing_companies.py +++ b/src/aki_prj23_transparenzregister/apps/find_missing_companies.py @@ -33,6 +33,58 @@ from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector from aki_prj23_transparenzregister.utils.sql import connector, entities +def cli() -> None: # pragma: no cover + """CLI entry point.""" + parser = argparse.ArgumentParser( + prog="Transparenzregister Find Missing Companies", + description="Fetches missing companies found in the data processing step from the SQL db and ingests their meta data.", + epilog="Example: find-missing-companies secrets.json", + ) + parser.add_argument( + "config", + metavar="config", + default="ENV", + ) + add_logger_options_to_argparse(parser) + + parsed = parser.parse_args(sys.argv[1:]) + configer_logger(namespace=parsed) + config = parsed.config + config_provider = get_config_provider(config) + + main(config_provider) + + +def main(config_provider: ConfigProvider) -> None: + """Main routine. + + Args: + config_provider (ConfigProvider): Configuration provider + """ + session = connector.get_session(config_provider) + + # TODO Ensure that max. 60 entries are mined per hour as per the handelsregister.de terms of service (see: https://www.handelsregister.de/rp_web/impressum.xhtml) + missing_companies = ( + session.query(entities.MissingCompany) + .where(entities.MissingCompany.searched_for == False) # noqa + .order_by(entities.MissingCompany.number_of_links.desc()) + .limit(5) + ) + + batch_size = 5 + pool = multiprocessing.Pool(processes=batch_size) + # Scrape data from unternehmensregister + params = [(company.name, config_provider) for company in missing_companies] + # Map the process_handler function to the parameter list using the Pool + pool.starmap(work, params) + + # Close the Pool to prevent any more tasks from being submitted + pool.close() + + # Wait for all the processes to complete + pool.join() + + def work(company_name: str, config_provider: ConfigProvider) -> None: """Main method. @@ -40,11 +92,12 @@ def work(company_name: str, config_provider: ConfigProvider) -> None: company_name (str): Name of the company to search for config_provider (ConfigProvider): ConfigProvider """ + logger.info(f"Processing {company_name}") with tempfile.TemporaryDirectory() as tmp_dir: xml_dir = os.path.join(*[tmp_dir, "xml"]) os.makedirs(xml_dir, exist_ok=True) try: - extract.scrape(company_name, xml_dir, True, True) # type: ignore + extract.scrape(company_name, xml_dir, False, True) # type: ignore except Exception as e: logger.error(e) return @@ -96,50 +149,12 @@ def work(company_name: str, config_provider: ConfigProvider) -> None: company.searched_for = True # type: ignore session.commit() logger.info(f"Processed {company_name}") + else: + logger.info(f"No results for {company_name}") except Exception as e: logger.error(e) return if __name__ == "__main__": - parser = argparse.ArgumentParser( - prog="Transparenzregister Webserver", - description="Starts an Dash Webserver that shows our Analysis.", - epilog="Example: webserver --log-level ERROR --log-path print.log", - ) - parser.add_argument( - "config", - metavar="config", - default="ENV", - ) - add_logger_options_to_argparse(parser) - - parsed = parser.parse_args(sys.argv[1:]) - configer_logger(namespace=parsed) - config = parsed.config - config_provider = get_config_provider(config) - session = connector.get_session(config_provider) - - company_mongo_service = CompanyMongoService( - MongoConnector(config_provider.get_mongo_connection_string()) - ) - - missing_companies = ( - session.query(entities.MissingCompany) - .where(entities.MissingCompany.searched_for == False) # noqa - .all() - ) - - batch_size = 5 - pool = multiprocessing.Pool(processes=batch_size) - # Scrape data from unternehmensregister - params = [(company.name, config_provider) for company in missing_companies] - # Map the process_handler function to the parameter list using the Pool - pool.starmap(work, params) - - # Close the Pool to prevent any more tasks from being submitted - pool.close() - - # Wait for all the processes to complete - pool.join() - # for company in tqdm(missing_companies): + cli() diff --git a/src/aki_prj23_transparenzregister/apps/ingest.py b/src/aki_prj23_transparenzregister/apps/ingest.py new file mode 100644 index 0000000..281dd06 --- /dev/null +++ b/src/aki_prj23_transparenzregister/apps/ingest.py @@ -0,0 +1,105 @@ +"""Ingestion entry point.""" +import argparse +import json +import os +import sys +import time + +from schedule import every, get_jobs, run_pending + +from aki_prj23_transparenzregister.apps import fetch_news +from aki_prj23_transparenzregister.apps.enrich_company_financials import ( + main as enrich_company_financials_main, +) +from aki_prj23_transparenzregister.apps.find_missing_companies import ( + main as find_missing_companies_main, +) +from aki_prj23_transparenzregister.config.config_providers import ( + ConfigProvider, + get_config_provider, +) +from aki_prj23_transparenzregister.utils.logger_config import ( + add_logger_options_to_argparse, + configer_logger, +) + + +def load_schedule(schedule_file: str) -> dict: + """Load scheudle data from file. + + Returns: + dict: Schedule data + """ + try: + with open(schedule_file) as file: + return json.load(file) + except FileNotFoundError: + return {} + + +def save_schedule(schedule_data: dict, schedule_file: str) -> None: + """Persist schedule data to file. + + Args: + schedule_data (dict): Schedule data + schedule_file (str): Schedule file path + """ + with open(schedule_file, "w") as file: + json.dump(schedule_data, file, default=str) + + +def cli() -> None: # pragma: no cover + """CLI entry point.""" + parser = argparse.ArgumentParser( + prog="Transparenzregister Company ingestion", + description="Ingests all missing companies and enriches them with finandcial data - runs on scheulde.", + epilog="Example: ingest --log-level ERROR --log-path print.log", + ) + parser.add_argument( + "config", + metavar="config", + default="ENV", + ) + add_logger_options_to_argparse(parser) + + parsed = parser.parse_args(sys.argv[1:]) + configer_logger(namespace=parsed) + config = parsed.config + config_provider = get_config_provider(config) + + schedule_file = os.getenv("PYTHON_INGEST_SCHEDULE_FILE", "schedule.json") + # Load existing schedule data + schedule_data = load_schedule(schedule_file) + + # Schedule tasks or resume scheduling based on last execution times + every(6).hours.do(fetch_news.main, config_provider).tag("fetch_news") + every(3).hours.do(main, config_provider).tag("missing_compnies_and_financials") + + # Run the scheduler in a persistent loops + while True: + run_pending() + + # Update last execution times in schedule data + for job in get_jobs(): + schedule_data[list(job.tags)[0]] = job.next_run + + # Save schedule data + save_schedule(schedule_data, schedule_file) + + time.sleep(1) + + +def main(config_provider: ConfigProvider) -> None: + """Main routine. + + Args: + config_provider (ConfigProvider): Configuration provider + """ + # First, find missing companies + find_missing_companies_main(config_provider) + # Then, enrich them with financial data + enrich_company_financials_main(config_provider) + + +if __name__ == "__main__": + cli() diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py index fde0e48..93e33f5 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py @@ -138,8 +138,9 @@ def scrape( # noqa: PLR0915 file_name, ) processed_companies.append(company_name) - except Exception: + except Exception as e: logger.warning("Exception caught in Scraping") + logger.warning(e) finally: for _ in range(pages_navigated): # should be 6 driver.back() diff --git a/tests/apps/fetch_news_test.py b/tests/apps/fetch_news_test.py index 81063f3..ab91b75 100644 --- a/tests/apps/fetch_news_test.py +++ b/tests/apps/fetch_news_test.py @@ -62,9 +62,7 @@ def test_schedule( mock_handelsblatt_rss.return_value = Mock( get_news_for_category=Mock(return_value=mock_news_handelsblatt) ) - assert fetch_news.schedule(Mock()) == len( - mock_news_handelsblatt + mock_news_tagesschau - ) + assert fetch_news.main(Mock()) == len(mock_news_handelsblatt + mock_news_tagesschau) @patch("aki_prj23_transparenzregister.apps.fetch_news.MongoNewsService") @@ -90,4 +88,4 @@ def test_schedule_error( mock_handelsblatt_rss.return_value = Mock( get_news_for_category=Mock(return_value=mock_news_handelsblatt) ) - assert fetch_news.schedule(Mock()) == 0 + assert fetch_news.main(Mock()) == 0 diff --git a/tests/apps/ingest_test.py b/tests/apps/ingest_test.py new file mode 100644 index 0000000..31df635 --- /dev/null +++ b/tests/apps/ingest_test.py @@ -0,0 +1,41 @@ +"""Testing apps/ingest.py.""" +import json +import tempfile +from unittest.mock import Mock, patch + +from aki_prj23_transparenzregister.apps import ingest + + +def test_import() -> None: + assert ingest + + +def test_load_schedule() -> None: + with tempfile.TemporaryDirectory() as temp_dir: + data = {"test": "test"} + path = f"{temp_dir}/schedule.json" + with open(path, "w") as file: + json.dump({"test": "test"}, file) + assert ingest.load_schedule(path) == data + + +def test_load_scheduler_no_result() -> None: + assert ingest.load_schedule("./hello_there.json") == {} + + +def test_save_schedule() -> None: + with tempfile.TemporaryDirectory() as temp_dir: + data = {"test": "test"} + path = f"{temp_dir}/schedule.json" + + ingest.save_schedule(data, path) + with open(path) as file: + assert json.load(file) == data + + +@patch("aki_prj23_transparenzregister.apps.ingest.find_missing_companies_main") +@patch("aki_prj23_transparenzregister.apps.ingest.enrich_company_financials_main") +def test_main(mock_financials: Mock, mock_find_missing: Mock) -> None: + ingest.main(Mock()) + assert mock_financials.called + assert mock_find_missing.called