Feat/116 scheduling tools (#358)

Init `ingestion` container with `fetch_news` target to retrieve latest
news articles from Tagesschau and Handelsblatt twice a day.

Integration of the `find_missing_companies.py` will follow once this is
merged.
This commit is contained in:
Tristan Nolde 2023-11-11 14:34:51 +01:00 committed by GitHub
commit bbc15bc7a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 583 additions and 9 deletions

View File

@ -24,9 +24,23 @@ FROM base as ingest
LABEL PART="DATA_INGESTOR"
### Install Chrome ###
# Update the package lists
RUN apt-get update
# Install wget and unzip
RUN apt-get install -y wget unzip
# Install Google Chrome
RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
RUN dpkg -i google-chrome-stable_current_amd64.deb; apt-get -fy install
RUN pip install --find-links=dist aki-prj23-transparenzregister[ingest] --no-cache-dir && \
rm dist/ -R
ENTRYPOINT ["fetch-news-schedule", "ENV"]
CMD ["--level", "DEBUG"]
FROM base as data-transformation
LABEL PART="DATA-TRANSFORMATION"

View File

@ -56,12 +56,12 @@ the following layout:
```
PYTHON_POSTGRES_USERNAME=postgres
PYTHON_POSTGRES_PASSWORD=postgres
PYTHON_POSTGRES_HOST=localhost
PYTHON_POSTGRES_HOST=postgres
PYTHON_POSTGRES_DATABASE=postgres
PYTHON_POSTGRES_PORT=5432
PYTHON_MONGO_USERNAME=username
PYTHON_MONGO_HOST=localhost
PYTHON_MONGO_HOST=mongodb
PYTHON_MONGO_PASSWORD=password
PYTHON_MONGO_PORT=27017
PYTHON_MONGO_DATABASE=transparenzregister

View File

@ -13,7 +13,7 @@ services:
PYTHON_MONGO_PORT: ${PYTHON_MONGO_PORT:-27017}
PYTHON_MONGO_DATABASE: ${PYTHON_MONGO_DATABASE:-transparenzregister}
deploy:
replicas: 0
replicas: 1
restart: on-failure:3
mongodb:

13
poetry.lock generated
View File

@ -5556,6 +5556,17 @@ tensorflow = ["safetensors[numpy]", "tensorflow (>=2.11.0)"]
testing = ["h5py (>=3.7.0)", "huggingface_hub (>=0.12.1)", "hypothesis (>=6.70.2)", "pytest (>=7.2.0)", "pytest-benchmark (>=4.0.0)", "safetensors[numpy]", "setuptools_rust (>=1.5.2)"]
torch = ["safetensors[numpy]", "torch (>=1.10)"]
[[package]]
name = "schedule"
version = "1.2.1"
description = "Job scheduling for humans."
optional = false
python-versions = ">=3.7"
files = [
{file = "schedule-1.2.1-py2.py3-none-any.whl", hash = "sha256:14cdeb083a596aa1de6dc77639a1b2ac8bf6eaafa82b1c9279d3612823063d01"},
{file = "schedule-1.2.1.tar.gz", hash = "sha256:843bc0538b99c93f02b8b50e3e39886c06f2d003b24f48e1aa4cadfa3f341279"},
]
[[package]]
name = "scipy"
version = "1.11.3"
@ -7368,4 +7379,4 @@ web-server = ["dash", "dash-auth", "dash-bootstrap-components", "matplotlib", "n
[metadata]
lock-version = "2.0"
python-versions = ">=3.11,<3.13"
content-hash = "5ca44ede811dc417faeda6b976c032682be7b4edadc16fc6c81e2ffe3dc4f946"
content-hash = "74469846b7987ea0e7fa202cfa3d2406513f7ef02d849f38f3bfca49dd1d71c9"

View File

@ -70,6 +70,7 @@ pymongo = "^4.6.0"
python = ">=3.11,<3.13"
python-dotenv = "^1.0.0"
rapidfuzz = "^3.5.2"
schedule = "^1.2.1"
scipy = "^1.11.3"
seaborn = "^0.13.0"
selenium = "^4.15.2"
@ -141,6 +142,7 @@ pytest-repeat = "^0.9.1"
copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli"
data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli"
data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli"
fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:fetch_news_cli"
reset-sql = "aki_prj23_transparenzregister.utils.sql.connector:reset_all_tables_cli"
webserver = "aki_prj23_transparenzregister.ui.app:main"

View File

@ -0,0 +1,101 @@
"""Scheduled news article extraction and transfer to MongoDB."""
import argparse
import sys
import time
from loguru import logger
from schedule import every, run_pending
from aki_prj23_transparenzregister.config.config_providers import (
HELP_TEXT_CONFIG,
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt import (
HandelsblattRSS,
)
from aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau import (
TagesschauAPI,
)
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
from aki_prj23_transparenzregister.utils.mongo.news_mongo_service import (
MongoNewsService,
)
def fetch_news_cli() -> None: # pragma: no cover
"""A cli interface to fetch latest news articles on a schedule."""
parser = argparse.ArgumentParser(
prog="Fetch News on schedule",
description="Fetch latest news articles from various sources on a schedule and transfer them to MongoDB.",
epilog="Example: 'fetch-news-schedule secrets.json' or 'fetchh-news-schedule ENV_VARS_'",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
help=HELP_TEXT_CONFIG,
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config_provider = get_config_provider(parsed.config)
every(12).hours.do(schedule, config_provider)
while True:
run_pending()
time.sleep(1)
def schedule(config_provider: ConfigProvider) -> int:
"""Scheduled job to fetch news articles and transfer them to MongoDB.
Args:
config_provider (ConfigProvider): ConfigProvider to get the MongoDB connection string
Returns:
int: Number of new documents inserted into MongoDB
"""
logger.info("Starting scheduled job")
mongo_news_service = MongoNewsService(
MongoConnector(config_provider.get_mongo_connection_string())
)
handelsblatt = HandelsblattRSS()
tagesschau = TagesschauAPI()
news_handelsblatt = handelsblatt.get_news_for_category()
if news_handelsblatt is None:
logger.error("Error while fetching news from Handelsblatt")
news_handelsblatt = []
news_tageschau = tagesschau.get_news_for_category()
if news_tageschau is None:
logger.error("Error while fetching news from Tagesschau")
news_tageschau = []
logger.info(f"Found {len(news_handelsblatt)} news articles from Handelsblatt")
logger.info(f"Found {len(news_tageschau)} news articles from Tagesschau")
news_joined = news_handelsblatt + news_tageschau
count_new_documents = 0
count_duplicate_documents = 0
for news in news_joined:
db_news = mongo_news_service.get_by_id(news.id)
if db_news is None:
mongo_news_service.insert(news)
count_new_documents += 1
else:
count_duplicate_documents += 1
logger.info(f"Inserted {count_new_documents} news documents")
logger.info(
f"Found {count_duplicate_documents} duplicate news documents while inserting"
)
logger.info("Finished scheduled job")
logger.info("=========================================")
return count_new_documents

View File

@ -0,0 +1 @@
"""Data extraction of news articles from various sources."""

View File

@ -0,0 +1,40 @@
"""Base class for news extractors."""
import abc
from aki_prj23_transparenzregister.models.news import News
class BaseNewsExtractor(metaclass=abc.ABCMeta):
"""Base class for news extractors."""
base_url: str
def __init__(self, base_url: str):
"""Constructor.
Args:
base_url (str): Base URL of the API.
"""
self.base_url = base_url
@abc.abstractmethod
def get_news_for_category(self, category: str) -> list[News] | None:
"""Retrieve news for the given category from the API implemented by the subclass.
Args:
category (str): News category to retrieve.
Returns:
list[News] | None: List of news or None if an error occured.
"""
@abc.abstractmethod
def __get_news_details_text__(self, url: str) -> str:
"""Retrieve the text of a news article.
Args:
url (str): URL of the news article.
Returns:
str: Text of the news article.
"""

View File

@ -0,0 +1,84 @@
"""Handelsblatt RSS news extractor."""
from datetime import datetime
import requests
import xmltodict
from bs4 import BeautifulSoup
from loguru import logger
from selenium import webdriver
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.base import (
BaseNewsExtractor,
)
class HandelsblattRSS(BaseNewsExtractor):
"""Handelsblatt RSS news extractor."""
def __init__(self) -> None:
"""Constructor."""
super().__init__("https://www.handelsblatt.com/contentexport/feed")
@logger.catch(reraise=True)
def get_news_for_category(self, category: str = "unternehmen") -> list[News] | None:
"""Retrieve news for the given category from the Handelsblatt RSS feed.
Args:
category (str, optional): Category to search for. Defaults to "unternehmen".
Returns:
list[News] | None: List of news or None if an error occured.
"""
url = f"{self.base_url}/{category}"
result = requests.get(url=url, timeout=60)
if not result.ok:
return None
news: list[News] = []
items = xmltodict.parse(result.text)["rss"]["channel"]["item"]
for article in items:
news.append(
News(
id=article["guid"],
title=article["title"],
date=datetime.strptime(
article["pubDate"], "%a, %d %b %Y %H:%M:%S %z"
).strftime("%Y-%m-%dT%H:%M:%S%z"),
text=self.__get_news_details_text__(article["link"]),
source_url=article["link"],
)
)
return news
def __get_news_details_text__(self, url: str) -> str:
"""Retrieve the text of a news article.
Args:
url (str): URL of the news article.
Returns:
str: Text of the news article.
"""
options = webdriver.ChromeOptions()
preferences = {
"profile.default_content_settings.popups": 0,
"safebrowsing.enabled": True,
}
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_experimental_option("prefs", preferences)
options.add_experimental_option("excludeSwitches", ["enable-logging"])
# Arguments required for running Chrome in Docker
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=options)
driver.get(url)
content = driver.page_source
soup = BeautifulSoup(content, features="html.parser")
return " ".join(
[elem.text.replace("\n", " ") for elem in soup.find_all("p")][:]
).strip()

View File

@ -0,0 +1,63 @@
"""Tageschau API news extractor."""
import requests
from bs4 import BeautifulSoup
from loguru import logger
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.base import (
BaseNewsExtractor,
)
class TagesschauAPI(BaseNewsExtractor):
"""Tageschau API news extractor."""
def __init__(self) -> None:
"""Constructor."""
super().__init__("https://www.tagesschau.de/api2")
@logger.catch(reraise=True)
def get_news_for_category(self, category: str = "wirtschaft") -> list[News] | None:
"""Retrieve news for the given category from the Tageschau API.
Args:
category (str, optional): Category to search for. Defaults to "wirtschaft".
Returns:
list[News] | None: List of news or None if an error occured.
"""
url = f"{self.base_url}/news/"
regions = ",".join([str(i) for i in range(1, 16)])
result = requests.get(
url=url, params={"regions": regions, "ressort": category}, timeout=60
)
if not result.ok:
return None
news = []
for item in result.json()["news"]:
news.append(
News(
id=item["externalId"],
title=item["title"],
date=item["date"],
source_url=item["detailsweb"],
text=self.__get_news_details_text__(item["detailsweb"]),
)
)
return news
def __get_news_details_text__(self, url: str) -> str:
"""Retrieve the text of a news article.
Args:
url (str): URL of the news article.
Returns:
str: Text of the news article.
"""
content = requests.get(url, timeout=60)
soup = BeautifulSoup(content.text, features="html.parser")
return " ".join(
[elem.text.replace("\n", " ") for elem in soup.find_all("p")][1:]
).strip()

View File

@ -20,7 +20,7 @@ class MongoNewsService:
"""Get all News documents.
Returns:
list[News]: _description_
list[News]: List of News documents
"""
result = self.collection.find()
return [MongoEntryTransformer.transform_outgoing(elem) for elem in result]
@ -29,10 +29,10 @@ class MongoNewsService:
"""Get a News document by the given id.
Args:
id (str): _description_
id (str): ID of the News document
Returns:
News | None: _description_
News | None: News document or None if not found
"""
result = list(self.collection.find({"_id": id}))
if len(result) == 1:
@ -43,10 +43,10 @@ class MongoNewsService:
"""Insert a new News document.
Args:
news (News): _description_
news (News): News article to be inserted
Returns:
_type_: _description_
InsertOneResult: Result of the insert operation
"""
return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news))

View File

@ -0,0 +1,93 @@
"""Testing apps/fetch_news.py."""
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.apps import fetch_news
from aki_prj23_transparenzregister.models.news import News
def test_import() -> None:
assert fetch_news is not None
@patch("aki_prj23_transparenzregister.apps.fetch_news.MongoNewsService")
@patch("aki_prj23_transparenzregister.apps.fetch_news.MongoConnector")
@patch("aki_prj23_transparenzregister.apps.fetch_news.HandelsblattRSS")
@patch("aki_prj23_transparenzregister.apps.fetch_news.TagesschauAPI")
def test_schedule(
mock_tagesschau_api: Mock,
mock_handelsblatt_rss: Mock,
mock_mongo_connector: Mock,
mock_mongo_news_service: Mock,
) -> None:
mock_mongo_connector.return_value = Mock()
mock_mongo_news_service.return_value = Mock(
get_by_id=Mock(return_value=None), insert=Mock(return_value=Mock)
)
mock_news_handelsblatt = [
News(
id="test",
title="The oldest and strongest emotion of mankind is fear, and the oldest and strongest kind of fear is fear of the unknown",
date="2023-11-10T09:10:27+0100",
source_url="https://www.handelsblatt.com/test",
text="",
),
News(
id="test",
title="That is not dead which can eternal lie, And with strange aeons even death may die.",
date="2023-11-10T09:10:27+0100",
source_url="https://www.handelsblatt.com/test",
text="",
),
]
mock_news_tagesschau = [
News(
id="test",
title="I know always that I am an outsider; a stranger in this century and among those who are still men.",
date="2023-11-10T09:10:27+0100",
source_url="https://www.tagesschau.de/test",
text="",
),
News(
id="test",
title="Ph'nglui mglw'nafh Cthulhu R'lyeh wgah'nagl fhtagn.",
date="2023-11-10T09:10:27+0100",
source_url="https://www.tagesschau.de/test",
text="",
),
]
mock_tagesschau_api.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_tagesschau)
)
mock_handelsblatt_rss.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_handelsblatt)
)
assert fetch_news.schedule(Mock()) == len(
mock_news_handelsblatt + mock_news_tagesschau
)
@patch("aki_prj23_transparenzregister.apps.fetch_news.MongoNewsService")
@patch("aki_prj23_transparenzregister.apps.fetch_news.MongoConnector")
@patch("aki_prj23_transparenzregister.apps.fetch_news.HandelsblattRSS")
@patch("aki_prj23_transparenzregister.apps.fetch_news.TagesschauAPI")
def test_schedule_error(
mock_tagesschau_api: Mock,
mock_handelsblatt_rss: Mock,
mock_mongo_connector: Mock,
mock_mongo_news_service: Mock,
) -> None:
mock_mongo_connector.return_value = Mock()
mock_mongo_news_service.return_value = Mock(
get_by_id=Mock(return_value=None), insert=Mock(return_value=Mock)
)
mock_news_handelsblatt = None
mock_news_tagesschau = None
mock_tagesschau_api.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_tagesschau)
)
mock_handelsblatt_rss.return_value = Mock(
get_news_for_category=Mock(return_value=mock_news_handelsblatt)
)
assert fetch_news.schedule(Mock()) == 0

View File

@ -0,0 +1,89 @@
"""Testing module for Handelsblatt RSS API."""
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt import (
HandelsblattRSS,
)
def test_init() -> None:
api = HandelsblattRSS()
assert api is not None
assert api.base_url == "https://www.handelsblatt.com/contentexport/feed"
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt.requests.get"
)
def test_get_news_for_category_error(mock_requests_get: Mock) -> None:
mock_requests_get.return_value = Mock(ok=False)
api = HandelsblattRSS()
assert api.get_news_for_category() is None
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt.requests.get"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt.HandelsblattRSS.__get_news_details_text__"
)
def test_get_news_for_category(mock_get_details: Mock, mock_requests_get: Mock) -> None:
mock_get_details.return_value = (
"Es war einmal vor langer Zeit, in einer weit, weit entfernten Galaxis..."
)
mock_response = """<rss version="2.0">
<channel>
<item>
<guid>test</guid>
<title>Test</title>
<pubDate>Fri, 10 Nov 2023 09:10:27 +0100</pubDate>
<link>https://www.handelsblatt.com/test</link>
</item>
<item>
<guid>test</guid>
<title>Test</title>
<pubDate>Fri, 10 Nov 2023 09:10:27 +0100</pubDate>
<link>https://www.handelsblatt.com/test</link>
</item>
</channel>
</rss>
"""
mock_requests_get.return_value = Mock(ok=True, text=mock_response)
api = HandelsblattRSS()
assert api.get_news_for_category() == [
News(
id="test",
title="Test",
date="2023-11-10T09:10:27+0100",
source_url="https://www.handelsblatt.com/test",
text="Es war einmal vor langer Zeit, in einer weit, weit entfernten Galaxis...",
),
News(
id="test",
title="Test",
date="2023-11-10T09:10:27+0100",
source_url="https://www.handelsblatt.com/test",
text="Es war einmal vor langer Zeit, in einer weit, weit entfernten Galaxis...",
),
]
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt.webdriver.Chrome"
)
def test_get_news_details_text(mock_driver: Mock) -> None:
mock_response = """
<html>
<body>
<p>Hallo Welt.</p>
<p>Dies ist ein Text.</p>
</body>
</html>
"""
mock_driver.return_value = Mock(page_source=mock_response)
api = HandelsblattRSS()
assert api.__get_news_details_text__("test") == "Hallo Welt. Dies ist ein Text."

View File

@ -0,0 +1,76 @@
"""Testing module for Tagesschau API."""
from unittest.mock import Mock, patch
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau import (
TagesschauAPI,
)
def test_init() -> None:
api = TagesschauAPI()
assert api is not None
assert api.base_url == "https://www.tagesschau.de/api2"
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau.requests.get"
)
def test_get_news_for_category_error(mock_requests_get: Mock) -> None:
mock_requests_get.return_value = Mock(ok=False)
api = TagesschauAPI()
assert api.get_news_for_category() is None
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau.requests.get"
)
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau.TagesschauAPI.__get_news_details_text__"
)
def test_get_news_for_category(mock_get_details: Mock, mock_requests_get: Mock) -> None:
mock_get_details.return_value = (
"Es war einmal vor langer Zeit, in einer weit, weit entfernten Galaxis..."
)
mock_response = {
"news": [
{
"externalId": "test",
"title": "Test",
"date": "2021-07-05",
"detailsweb": "https://www.tagesschau.de/test",
}
]
}
mock_requests_get.return_value = Mock(ok=True, json=lambda: mock_response)
api = TagesschauAPI()
assert api.get_news_for_category() == [
News(
id="test",
title="Test",
date="2021-07-05",
source_url="https://www.tagesschau.de/test",
text="Es war einmal vor langer Zeit, in einer weit, weit entfernten Galaxis...",
)
]
@patch(
"aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau.requests.get"
)
def test_get_news_details_text(mock_requests_get: Mock) -> None:
mock_response = """
<html>
<body>
<p>Title to be ignored</p>
<p>Hallo Welt.</p>
<p>Dies ist ein Text.</p>
</body>
</html>
"""
mock_requests_get.return_value = Mock(ok=True, text=mock_response)
api = TagesschauAPI()
assert api.__get_news_details_text__("test") == "Hallo Welt. Dies ist ein Text."