Ingest schedule (#391)

Includes a new "app" running the ingestion jobs (aka fetch_news and
find_missing_companies + enrich_company_financials) on a schedule.

This also fixes an issue with the previous schedule implementation by
persisting the schedule in a file that survives new deployment and
continues where it left off.
This commit is contained in:
Tristan Nolde
2023-12-04 19:05:52 +01:00
committed by GitHub
parent 011e169383
commit 92d14b5824
10 changed files with 280 additions and 82 deletions

View File

@ -38,7 +38,7 @@ RUN dpkg -i google-chrome-stable_current_amd64.deb; apt-get -fy install
RUN pip install --find-links=dist aki-prj23-transparenzregister[ingest] --no-cache-dir && \ RUN pip install --find-links=dist aki-prj23-transparenzregister[ingest] --no-cache-dir && \
rm dist/ -R rm dist/ -R
ENTRYPOINT ["fetch-news-schedule", "ENV"] ENTRYPOINT ["ingest", "ENV"]
CMD ["--level", "DEBUG"] CMD ["--level", "DEBUG"]
FROM base as data-transformation FROM base as data-transformation

View File

@ -17,6 +17,9 @@ services:
PYTHON_MONGO_PORT: ${PYTHON_MONGO_PORT:-27017} PYTHON_MONGO_PORT: ${PYTHON_MONGO_PORT:-27017}
PYTHON_MONGO_DATABASE: ${PYTHON_MONGO_DATABASE:-transparenzregister} PYTHON_MONGO_DATABASE: ${PYTHON_MONGO_DATABASE:-transparenzregister}
PYTHON_INGEST_SCHEDULE: ${PYTHON_INGEST_SCHEDULE:-4} PYTHON_INGEST_SCHEDULE: ${PYTHON_INGEST_SCHEDULE:-4}
PYTHON_INGEST_SCHEDULE_FILE: ${PYTHON_INGEST_SCHEDULE_FILE:-/data/ingest_schedule.json}
volumes:
- ingest_data:/data
mongodb: mongodb:
image: mongo:4.4.6 image: mongo:4.4.6
@ -91,6 +94,7 @@ services:
volumes: volumes:
postgres_data: postgres_data:
mongo_data: mongo_data:
ingest_data:
networks: networks:
default: default:

View File

@ -143,7 +143,9 @@ pytest-repeat = "^0.9.1"
copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli" copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli"
data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli" data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli"
data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli" data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli"
fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:fetch_news_cli" fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:cli"
find-missing-companies = "aki_prj23_transparenzregister.apps.find_missing_companies:cli"
ingest = "aki_prj23_transparenzregister.apps.ingest:cli"
reset-sql = "aki_prj23_transparenzregister.utils.sql.reset_sql:cli" reset-sql = "aki_prj23_transparenzregister.utils.sql.reset_sql:cli"
webserver = "aki_prj23_transparenzregister.ui.app:main" webserver = "aki_prj23_transparenzregister.ui.app:main"

View File

@ -1,16 +1,76 @@
"""Add financial data to companies.""" """Add financial data to companies."""
import argparse
import concurrent.futures
import sys
import typing import typing
from aki_prj23_transparenzregister.config.config_providers import JsonFileConfigProvider from loguru import logger
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.data_extraction.bundesanzeiger import ( from aki_prj23_transparenzregister.utils.data_extraction.bundesanzeiger import (
Bundesanzeiger, Bundesanzeiger,
) )
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import ( from aki_prj23_transparenzregister.utils.mongo.company_mongo_service import (
CompanyMongoService, CompanyMongoService,
) )
from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
def cli() -> None: # pragma: no cover
"""CLI entry point."""
parser = argparse.ArgumentParser(
prog="Transparenzregister enriching companies with mising financial data",
description="Filters all raw companies with missing financial info from the MongoDB and enriches them with yearly result data from the Bundesanzeiger.",
epilog="Example: enrich-company-financials --log-level ERROR --log-path print.log",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
config_provider = get_config_provider(config)
main(config_provider)
def main(config_provider: ConfigProvider) -> None: # pragma: no cover
"""Main routine.
Args:
config_provider (ConfigProvider): Configuration provider
"""
mongo_connector = MongoConnector(config_provider.get_mongo_connection_string())
company_service = CompanyMongoService(mongo_connector)
num_threads = 5
companies = company_service.get_where_no_financial_results()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# Submit tasks for each entry in the list
future_to_entry = {
executor.submit(work, entry, company_service): entry for entry in companies
}
# Wait for all tasks to complete
for future in concurrent.futures.as_completed(future_to_entry):
entry = future_to_entry[future]
logger.info(f"Starting to process: {entry['name']}")
try:
# Get the result of the completed task (if needed)
future.result()
except Exception as e:
logger.error(f"Encountered error {e}")
def work(company: typing.Any, company_service: CompanyMongoService) -> None: def work(company: typing.Any, company_service: CompanyMongoService) -> None:
"""Process company regarding financials. """Process company regarding financials.
@ -32,32 +92,4 @@ def work(company: typing.Any, company_service: CompanyMongoService) -> None:
if __name__ == "__main__": if __name__ == "__main__":
import concurrent.futures cli()
from loguru import logger
config_provider = JsonFileConfigProvider("./secrets.json")
mongo_connector = MongoConnector(config_provider.get_mongo_connection_string())
company_service = CompanyMongoService(mongo_connector)
num_threads = 25
companies = company_service.get_where_no_financial_results()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# Submit tasks for each entry in the list
future_to_entry = {
executor.submit(work, entry, company_service): entry for entry in companies
}
# with tqdm(total=len(companies)) as pbar:
# Wait for all tasks to complete
for future in concurrent.futures.as_completed(future_to_entry):
entry = future_to_entry[future]
logger.info(entry["name"])
try:
# Get the result of the completed task (if needed)
result = future.result()
# pbar.set_description(entry["name"])
# pbar.update(1)
except Exception as e:
logger.error(f"Error processing entry {e}")

View File

@ -28,7 +28,7 @@ from aki_prj23_transparenzregister.utils.mongo.news_mongo_service import (
) )
def fetch_news_cli() -> None: # pragma: no cover def cli() -> None: # pragma: no cover
"""A cli interface to fetch latest news articles on a schedule.""" """A cli interface to fetch latest news articles on a schedule."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="Fetch News on schedule", prog="Fetch News on schedule",
@ -49,14 +49,14 @@ def fetch_news_cli() -> None: # pragma: no cover
schedule = int(os.getenv("PYTHON_INGEST_SCHEDULE", "6")) schedule = int(os.getenv("PYTHON_INGEST_SCHEDULE", "6"))
logger.info(f"Scheduled job will run every {schedule} hours") logger.info(f"Scheduled job will run every {schedule} hours")
every(schedule).hours.do(schedule, config_provider) every(schedule).hours.do(main, config_provider)
while True: while True:
run_pending() run_pending()
time.sleep(1) time.sleep(1)
def schedule(config_provider: ConfigProvider) -> int: def main(config_provider: ConfigProvider) -> int:
"""Scheduled job to fetch news articles and transfer them to MongoDB. """Scheduled job to fetch news articles and transfer them to MongoDB.
Args: Args:

View File

@ -33,6 +33,58 @@ from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
from aki_prj23_transparenzregister.utils.sql import connector, entities from aki_prj23_transparenzregister.utils.sql import connector, entities
def cli() -> None: # pragma: no cover
"""CLI entry point."""
parser = argparse.ArgumentParser(
prog="Transparenzregister Find Missing Companies",
description="Fetches missing companies found in the data processing step from the SQL db and ingests their meta data.",
epilog="Example: find-missing-companies secrets.json",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
config_provider = get_config_provider(config)
main(config_provider)
def main(config_provider: ConfigProvider) -> None:
"""Main routine.
Args:
config_provider (ConfigProvider): Configuration provider
"""
session = connector.get_session(config_provider)
# TODO Ensure that max. 60 entries are mined per hour as per the handelsregister.de terms of service (see: https://www.handelsregister.de/rp_web/impressum.xhtml)
missing_companies = (
session.query(entities.MissingCompany)
.where(entities.MissingCompany.searched_for == False) # noqa
.order_by(entities.MissingCompany.number_of_links.desc())
.limit(5)
)
batch_size = 5
pool = multiprocessing.Pool(processes=batch_size)
# Scrape data from unternehmensregister
params = [(company.name, config_provider) for company in missing_companies]
# Map the process_handler function to the parameter list using the Pool
pool.starmap(work, params)
# Close the Pool to prevent any more tasks from being submitted
pool.close()
# Wait for all the processes to complete
pool.join()
def work(company_name: str, config_provider: ConfigProvider) -> None: def work(company_name: str, config_provider: ConfigProvider) -> None:
"""Main method. """Main method.
@ -40,11 +92,12 @@ def work(company_name: str, config_provider: ConfigProvider) -> None:
company_name (str): Name of the company to search for company_name (str): Name of the company to search for
config_provider (ConfigProvider): ConfigProvider config_provider (ConfigProvider): ConfigProvider
""" """
logger.info(f"Processing {company_name}")
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
xml_dir = os.path.join(*[tmp_dir, "xml"]) xml_dir = os.path.join(*[tmp_dir, "xml"])
os.makedirs(xml_dir, exist_ok=True) os.makedirs(xml_dir, exist_ok=True)
try: try:
extract.scrape(company_name, xml_dir, True, True) # type: ignore extract.scrape(company_name, xml_dir, False, True) # type: ignore
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return return
@ -96,50 +149,12 @@ def work(company_name: str, config_provider: ConfigProvider) -> None:
company.searched_for = True # type: ignore company.searched_for = True # type: ignore
session.commit() session.commit()
logger.info(f"Processed {company_name}") logger.info(f"Processed {company_name}")
else:
logger.info(f"No results for {company_name}")
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return return
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser( cli()
prog="Transparenzregister Webserver",
description="Starts an Dash Webserver that shows our Analysis.",
epilog="Example: webserver --log-level ERROR --log-path print.log",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
config_provider = get_config_provider(config)
session = connector.get_session(config_provider)
company_mongo_service = CompanyMongoService(
MongoConnector(config_provider.get_mongo_connection_string())
)
missing_companies = (
session.query(entities.MissingCompany)
.where(entities.MissingCompany.searched_for == False) # noqa
.all()
)
batch_size = 5
pool = multiprocessing.Pool(processes=batch_size)
# Scrape data from unternehmensregister
params = [(company.name, config_provider) for company in missing_companies]
# Map the process_handler function to the parameter list using the Pool
pool.starmap(work, params)
# Close the Pool to prevent any more tasks from being submitted
pool.close()
# Wait for all the processes to complete
pool.join()
# for company in tqdm(missing_companies):

View File

@ -0,0 +1,105 @@
"""Ingestion entry point."""
import argparse
import json
import os
import sys
import time
from schedule import every, get_jobs, run_pending
from aki_prj23_transparenzregister.apps import fetch_news
from aki_prj23_transparenzregister.apps.enrich_company_financials import (
main as enrich_company_financials_main,
)
from aki_prj23_transparenzregister.apps.find_missing_companies import (
main as find_missing_companies_main,
)
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
def load_schedule(schedule_file: str) -> dict:
"""Load scheudle data from file.
Returns:
dict: Schedule data
"""
try:
with open(schedule_file) as file:
return json.load(file)
except FileNotFoundError:
return {}
def save_schedule(schedule_data: dict, schedule_file: str) -> None:
"""Persist schedule data to file.
Args:
schedule_data (dict): Schedule data
schedule_file (str): Schedule file path
"""
with open(schedule_file, "w") as file:
json.dump(schedule_data, file, default=str)
def cli() -> None: # pragma: no cover
"""CLI entry point."""
parser = argparse.ArgumentParser(
prog="Transparenzregister Company ingestion",
description="Ingests all missing companies and enriches them with finandcial data - runs on scheulde.",
epilog="Example: ingest --log-level ERROR --log-path print.log",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = parsed.config
config_provider = get_config_provider(config)
schedule_file = os.getenv("PYTHON_INGEST_SCHEDULE_FILE", "schedule.json")
# Load existing schedule data
schedule_data = load_schedule(schedule_file)
# Schedule tasks or resume scheduling based on last execution times
every(6).hours.do(fetch_news.main, config_provider).tag("fetch_news")
every(3).hours.do(main, config_provider).tag("missing_compnies_and_financials")
# Run the scheduler in a persistent loops
while True:
run_pending()
# Update last execution times in schedule data
for job in get_jobs():
schedule_data[list(job.tags)[0]] = job.next_run
# Save schedule data
save_schedule(schedule_data, schedule_file)
time.sleep(1)
def main(config_provider: ConfigProvider) -> None:
"""Main routine.
Args:
config_provider (ConfigProvider): Configuration provider
"""
# First, find missing companies
find_missing_companies_main(config_provider)
# Then, enrich them with financial data
enrich_company_financials_main(config_provider)
if __name__ == "__main__":
cli()

View File

@ -138,8 +138,9 @@ def scrape( # noqa: PLR0915
file_name, file_name,
) )
processed_companies.append(company_name) processed_companies.append(company_name)
except Exception: except Exception as e:
logger.warning("Exception caught in Scraping") logger.warning("Exception caught in Scraping")
logger.warning(e)
finally: finally:
for _ in range(pages_navigated): # should be 6 for _ in range(pages_navigated): # should be 6
driver.back() driver.back()

View File

@ -62,9 +62,7 @@ def test_schedule(
mock_handelsblatt_rss.return_value = Mock( mock_handelsblatt_rss.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_handelsblatt) get_news_for_category=Mock(return_value=mock_news_handelsblatt)
) )
assert fetch_news.schedule(Mock()) == len( assert fetch_news.main(Mock()) == len(mock_news_handelsblatt + mock_news_tagesschau)
mock_news_handelsblatt + mock_news_tagesschau
)
@patch("aki_prj23_transparenzregister.apps.fetch_news.MongoNewsService") @patch("aki_prj23_transparenzregister.apps.fetch_news.MongoNewsService")
@ -90,4 +88,4 @@ def test_schedule_error(
mock_handelsblatt_rss.return_value = Mock( mock_handelsblatt_rss.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_handelsblatt) get_news_for_category=Mock(return_value=mock_news_handelsblatt)
) )
assert fetch_news.schedule(Mock()) == 0 assert fetch_news.main(Mock()) == 0

41
tests/apps/ingest_test.py Normal file
View File

@ -0,0 +1,41 @@
"""Testing apps/ingest.py."""
import json
import tempfile
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.apps import ingest
def test_import() -> None:
assert ingest
def test_load_schedule() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
data = {"test": "test"}
path = f"{temp_dir}/schedule.json"
with open(path, "w") as file:
json.dump({"test": "test"}, file)
assert ingest.load_schedule(path) == data
def test_load_scheduler_no_result() -> None:
assert ingest.load_schedule("./hello_there.json") == {}
def test_save_schedule() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
data = {"test": "test"}
path = f"{temp_dir}/schedule.json"
ingest.save_schedule(data, path)
with open(path) as file:
assert json.load(file) == data
@patch("aki_prj23_transparenzregister.apps.ingest.find_missing_companies_main")
@patch("aki_prj23_transparenzregister.apps.ingest.enrich_company_financials_main")
def test_main(mock_financials: Mock, mock_find_missing: Mock) -> None:
ingest.main(Mock())
assert mock_financials.called
assert mock_find_missing.called