checkpoint: Init news extraction components and main app

This commit is contained in:
TrisNol 2023-11-10 13:58:04 +01:00
parent 905021af14
commit a428eb4432
8 changed files with 258 additions and 6 deletions

13
poetry.lock generated
View File

@ -5556,6 +5556,17 @@ tensorflow = ["safetensors[numpy]", "tensorflow (>=2.11.0)"]
testing = ["h5py (>=3.7.0)", "huggingface_hub (>=0.12.1)", "hypothesis (>=6.70.2)", "pytest (>=7.2.0)", "pytest-benchmark (>=4.0.0)", "safetensors[numpy]", "setuptools_rust (>=1.5.2)"] testing = ["h5py (>=3.7.0)", "huggingface_hub (>=0.12.1)", "hypothesis (>=6.70.2)", "pytest (>=7.2.0)", "pytest-benchmark (>=4.0.0)", "safetensors[numpy]", "setuptools_rust (>=1.5.2)"]
torch = ["safetensors[numpy]", "torch (>=1.10)"] torch = ["safetensors[numpy]", "torch (>=1.10)"]
[[package]]
name = "schedule"
version = "1.2.1"
description = "Job scheduling for humans."
optional = false
python-versions = ">=3.7"
files = [
{file = "schedule-1.2.1-py2.py3-none-any.whl", hash = "sha256:14cdeb083a596aa1de6dc77639a1b2ac8bf6eaafa82b1c9279d3612823063d01"},
{file = "schedule-1.2.1.tar.gz", hash = "sha256:843bc0538b99c93f02b8b50e3e39886c06f2d003b24f48e1aa4cadfa3f341279"},
]
[[package]] [[package]]
name = "scipy" name = "scipy"
version = "1.11.3" version = "1.11.3"
@ -7368,4 +7379,4 @@ web-server = ["dash", "dash-auth", "dash-bootstrap-components", "matplotlib", "n
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.11,<3.13" python-versions = ">=3.11,<3.13"
content-hash = "5ca44ede811dc417faeda6b976c032682be7b4edadc16fc6c81e2ffe3dc4f946" content-hash = "74469846b7987ea0e7fa202cfa3d2406513f7ef02d849f38f3bfca49dd1d71c9"

View File

@ -70,6 +70,7 @@ pymongo = "^4.6.0"
python = ">=3.11,<3.13" python = ">=3.11,<3.13"
python-dotenv = "^1.0.0" python-dotenv = "^1.0.0"
rapidfuzz = "^3.5.2" rapidfuzz = "^3.5.2"
schedule = "^1.2.1"
scipy = "^1.11.3" scipy = "^1.11.3"
seaborn = "^0.13.0" seaborn = "^0.13.0"
selenium = "^4.15.2" selenium = "^4.15.2"
@ -141,6 +142,7 @@ pytest-repeat = "^0.9.1"
copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli" copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli"
data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli" data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli"
data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli" data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli"
fetch-news-schedule = "aki_prj23_transparenzregister.apps.fetch_news:fetch_news_cli"
reset-sql = "aki_prj23_transparenzregister.utils.sql.connector:reset_all_tables_cli" reset-sql = "aki_prj23_transparenzregister.utils.sql.connector:reset_all_tables_cli"
webserver = "aki_prj23_transparenzregister.ui.app:main" webserver = "aki_prj23_transparenzregister.ui.app:main"

View File

@ -0,0 +1,97 @@
"""Scheduled news article extraction and transfer to MongoDB."""
import argparse
import sys
import time
from loguru import logger
from schedule import every, run_pending
from aki_prj23_transparenzregister.config.config_providers import (
HELP_TEXT_CONFIG,
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.data_extraction.news.handelsblatt import (
HandelsblattRSS,
)
from aki_prj23_transparenzregister.utils.data_extraction.news.tagesschau import (
TagesschauAPI,
)
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
from aki_prj23_transparenzregister.utils.mongo.connector import MongoConnector
from aki_prj23_transparenzregister.utils.mongo.news_mongo_service import (
MongoNewsService,
)
def fetch_news_cli() -> None:
"""A cli interface to fetch latest news articles on a schedule."""
parser = argparse.ArgumentParser(
prog="Process and transform data",
description="Copy data from one SQL database to another.",
epilog="Example: 'data-transformation secrets.json' or 'data-transformation ENV_VARS_'",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
help=HELP_TEXT_CONFIG,
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config_provider = get_config_provider(parsed.config)
every(12).hours.do(schedule, config_provider)
while True:
run_pending()
time.sleep(30)
def schedule(config_provider: ConfigProvider) -> None:
"""Scheduled job to fetch news articles and transfer them to MongoDB.
Args:
config_provider (ConfigProvider): ConfigProvider to get the MongoDB connection string
"""
logger.info("Starting scheduled job")
mongo_news_service = MongoNewsService(
MongoConnector(config_provider.get_mongo_connection_string())
)
handelsblatt = HandelsblattRSS()
tagesschau = TagesschauAPI()
news_handelsblatt = handelsblatt.get_news_for_category()
if news_handelsblatt is None:
logger.error("Error while fetching news from Handelsblatt")
news_handelsblatt = []
news_tageschau = tagesschau.get_news_for_category()
if news_tageschau is None:
logger.error("Error while fetching news from Tagesschau")
news_tageschau = []
logger.info(f"Found {len(news_handelsblatt)} news articles from Handelsblatt")
logger.info(f"Found {len(news_tageschau)} news articles from Tagesschau")
news_joined = news_handelsblatt + news_tageschau
count_new_documents = 0
count_duplicate_documents = 0
for news in news_joined:
db_news = mongo_news_service.get_by_id(news.id)
if db_news is None:
mongo_news_service.insert(news)
count_new_documents += 1
else:
count_duplicate_documents += 1
logger.info(f"Inserted {count_new_documents} news documents")
logger.info(
f"Found {count_duplicate_documents} duplicate news documents while inserting"
)
logger.info("Finished scheduled job")
logger.info("=========================================")

View File

@ -0,0 +1 @@
"""Data extraction of news articles from various sources."""

View File

@ -0,0 +1,48 @@
"""Base class for news extractors."""
import abc
import requests
from bs4 import BeautifulSoup
from aki_prj23_transparenzregister.models.news import News
class BaseNewsExtractor(metaclass=abc.ABCMeta):
"""Base class for news extractors."""
base_url: str
def __init__(self, base_url: str):
"""Constructor.
Args:
base_url (str): Base URL of the API.
"""
self.base_url = base_url
@abc.abstractmethod
def get_news_for_category(self, category: str) -> list[News] | None:
"""Retrieve news for the given category from the API implemented by the subclass.
Args:
category (str): News category to retrieve.
Returns:
list[News] | None: List of news or None if an error occured.
"""
def __get_news_details_text__(self, url: str) -> str:
"""Retrieve the text of a news article.
Args:
url (str): URL of the news article.
Returns:
str: Text of the news article.
"""
content = requests.get(url, timeout=60)
soup = BeautifulSoup(content.text, features="html.parser")
return " ".join(
[elem.text.replace("\n", " ") for elem in soup.find_all("p")][:]
)

View File

@ -0,0 +1,49 @@
"""Handelsblatt RSS news extractor."""
from datetime import datetime
import requests
import xmltodict
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.base import (
BaseNewsExtractor,
)
class HandelsblattRSS(BaseNewsExtractor):
"""Handelsblatt RSS news extractor."""
def __init__(self) -> None:
"""Constructor."""
super().__init__("https://www.handelsblatt.com/contentexport/feed")
def get_news_for_category(self, category: str = "unternehmen") -> list[News] | None:
"""Retrieve news for the given category from the Handelsblatt RSS feed.
Args:
category (str, optional): Category to search for. Defaults to "unternehmen".
Returns:
list[News] | None: List of news or None if an error occured.
"""
url = f"{self.base_url}/{category}"
result = requests.get(url=url, timeout=60)
if not result.ok:
return None
news: list[News] = []
items = xmltodict.parse(result.text)["rss"]["channel"]["item"]
for article in items:
news.append(
News(
id=article["guid"],
title=article["title"],
date=datetime.strptime(
article["pubDate"], "%a, %d %b %Y %H:%M:%S %z"
).strftime("%Y-%m-%dT%H:%M:%S%z"),
# FIXME Will now require JS enabled --> Use selenium rather than simple requests
text=self.__get_news_details_text__(article["link"]),
source_url=article["link"],
)
)
return news

View File

@ -0,0 +1,44 @@
"""Tageschau API news extractor."""
import requests
from aki_prj23_transparenzregister.models.news import News
from aki_prj23_transparenzregister.utils.data_extraction.news.base import (
BaseNewsExtractor,
)
class TagesschauAPI(BaseNewsExtractor):
"""Tageschau API news extractor."""
def __init__(self) -> None:
"""Constructor."""
super().__init__("https://www.tagesschau.de/api2")
def get_news_for_category(self, category: str = "wirtschaft") -> list[News] | None:
"""Retrieve news for the given category from the Tageschau API.
Args:
category (str, optional): Category to search for. Defaults to "wirtschaft".
Returns:
list[News] | None: List of news or None if an error occured.
"""
url = f"{self.base_url}/news/"
regions = ",".join([str(i) for i in range(1, 16)])
result = requests.get(
url=url, params={"regions": regions, "ressort": category}, timeout=60
)
if not result.ok:
return None
news = []
for item in result.json()["news"]:
news.append(
News(
id=item["externalId"],
title=item["title"],
date=item["date"],
source_url=item["detailsweb"],
text=self.__get_news_details_text__(item["detailsweb"]),
)
)
return news

View File

@ -20,7 +20,7 @@ class MongoNewsService:
"""Get all News documents. """Get all News documents.
Returns: Returns:
list[News]: _description_ list[News]: List of News documents
""" """
result = self.collection.find() result = self.collection.find()
return [MongoEntryTransformer.transform_outgoing(elem) for elem in result] return [MongoEntryTransformer.transform_outgoing(elem) for elem in result]
@ -29,10 +29,10 @@ class MongoNewsService:
"""Get a News document by the given id. """Get a News document by the given id.
Args: Args:
id (str): _description_ id (str): ID of the News document
Returns: Returns:
News | None: _description_ News | None: News document or None if not found
""" """
result = list(self.collection.find({"_id": id})) result = list(self.collection.find({"_id": id}))
if len(result) == 1: if len(result) == 1:
@ -43,10 +43,10 @@ class MongoNewsService:
"""Insert a new News document. """Insert a new News document.
Args: Args:
news (News): _description_ news (News): News article to be inserted
Returns: Returns:
_type_: _description_ InsertOneResult: Result of the insert operation
""" """
return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news)) return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news))