Created pipeline to run ner sentiment and sql ingest (#314)

Created a dataprocessing pipline that enhances the raw mined data with
Organsiation extractions and sentiment analysis prio to moving the data
to the sql db.
The transfer of matched data is done afterword.

---------

Co-authored-by: SeZett <zeleny.sebastian@fh-swf.de>
This commit is contained in:
Philipp Horstenkamp 2023-11-11 14:28:12 +01:00 committed by GitHub
parent a6d486209a
commit 066800123d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 206 additions and 132 deletions

View File

@ -34,7 +34,7 @@ LABEL PART="DATA-TRANSFORMATION"
RUN pip install --find-links=dist aki-prj23-transparenzregister[transformation] --no-cache-dir && \
rm dist/ -R
ENTRYPOINT ["data-transformation", "ENV"]
ENTRYPOINT ["data-processing", "ENV"]
CMD ["--level", "DEBUG"]
FROM base as web-server

View File

@ -17,6 +17,7 @@ See the [CONTRIBUTING.md](CONTRIBUTING.md) about how code should be formatted an
The project has currently the following entrypoint available:
- **data-transformation** > Transfers all the data from the mongodb into the sql db to make it available as production data.
- **data-processing** > Processes the data using NLP methods and transfers matched data into the SQL table ready for use.
- **reset-sql** > Resets all sql tables in the connected db.
- **copy-sql** > Copys the content of a db to another db.
- **webserver** > Starts the webserver showing the analysis results.

View File

@ -139,6 +139,7 @@ pytest-repeat = "^0.9.1"
[tool.poetry.scripts]
copy-sql = "aki_prj23_transparenzregister.utils.sql.copy_sql:copy_db_cli"
data-processing = "aki_prj23_transparenzregister.utils.data_processing:cli"
data-transformation = "aki_prj23_transparenzregister.utils.data_transfer:transfer_data_cli"
reset-sql = "aki_prj23_transparenzregister.utils.sql.connector:reset_all_tables_cli"
webserver = "aki_prj23_transparenzregister.ui.app:main"

View File

@ -1,7 +1,8 @@
"""Pipeline to get Entities from Staging DB."""
import json
import os
import sys
from typing import Literal, get_args
from loguru import logger
from tqdm import tqdm
@ -10,9 +11,13 @@ import aki_prj23_transparenzregister.utils.mongo.connector as conn
import aki_prj23_transparenzregister.utils.mongo.news_mongo_service as news
from aki_prj23_transparenzregister.ai import ner_service
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
JsonFileConfigProvider,
)
ner_methods = Literal["spacy", "company_list", "transformer"]
doc_attribs = Literal["text", "title"]
logger.add(sys.stdout, colorize=True)
@ -26,39 +31,48 @@ class EntityPipeline:
self.news_obj = news.MongoNewsService(self.connector)
def process_documents(
self, entity: str, doc_attrib: str, ner_selection: str
self,
doc_attrib: doc_attribs,
ner_method: ner_methods,
) -> None:
"""Method to check documents, get entities and write them to document."""
CursorUnprogressed = self.news_obj.collection.find( # noqa: N806
"""Method to check documents, get entities and write them to document.
Args:
doc_attrib: Defines if headline "title" or body "text" of an article should be used to find organisations.
ner_method: The method provided used to analyse identify organisations.
"""
cursor_unprocessed = self.news_obj.collection.find(
{"companies": {"$exists": False}}
)
documents = list(CursorUnprogressed)
logger.info("Dokumente: ", str(CursorUnprogressed))
documents = list(cursor_unprocessed)
logger.info("Dokumente: ", str(cursor_unprocessed))
# Determine NER service based on config
# spaCy
if ner_selection == "use_spacy_ner":
if ner_method == "spacy":
ner_service_instance = ner_service.NerAnalysisService(
use_spacy=True, use_transformer=False, use_companylist=False
)
ner_service_func = ner_service_instance.ner_spacy
# company list
elif ner_selection == "use_companylist_ner":
elif ner_method == "company_list":
ner_service_instance = ner_service.NerAnalysisService(
use_spacy=False, use_transformer=False, use_companylist=True
)
ner_service_func = ner_service_instance.ner_company_list
# transformer
elif ner_selection == "use_transformer_ner":
elif ner_method == "transformer":
ner_service_instance = ner_service.NerAnalysisService(
use_spacy=False, use_transformer=True, use_companylist=False
)
ner_service_func = ner_service_instance.ner_transformer
else:
raise ValueError
if len(documents) > 0:
for document in tqdm(documents):
ents = ner_service_func(document, entity, doc_attrib)
ents = ner_service_func(document, "ORG", doc_attrib)
self.news_obj.collection.update_one(
{"_id": document["_id"]},
{"$set": {"companies": ents}},
@ -67,43 +81,37 @@ class EntityPipeline:
logger.info("No documents found.")
if __name__ == "__main__":
# Establish MongoDB Connection using secrets
config_provider = JsonFileConfigProvider("./secrets.json")
def execute_ner(config_provider: ConfigProvider) -> None:
"""Executes the ner pipline.
Args:
config_provider: A config prover to define the MongoDB to read and write.
"""
connect_string = config_provider.get_mongo_connection_string()
# dir of config json
config_file_path = (
"src/aki_prj23_transparenzregister/utils/mongo/ner_sentiment_config.json"
)
# Load NER service configuration from JSON
with open(config_file_path) as config_file:
ner_config = json.load(config_file)
# read configuration
entity = ner_config["ner_service"]["entity"]
logger.info("NER Pipeline: searching for entity of type", str(entity))
doc_attrib = ner_config["ner_service"]["doc_attrib"]
logger.info("NER Pipeline: searching in document attribute ", str(doc_attrib))
# read selected service
if ner_config["ner_service"]["use_companylist_ner"] is True:
ner_selection = "use_companylist_ner"
logger.info("NER Pipeline: Searching entities with company list")
elif ner_config["ner_service"]["use_spacy_ner"] is True:
ner_selection = "use_spacy_ner"
logger.info("NER Pipeline: Searching entities with spaCy")
elif ner_config["ner_service"]["use_transformer_ner"] is True:
ner_selection = "use_transformer_ner"
logger.info("NER Pipeline: Searching entities with transformer")
else:
logger.info(
"NER Pipeline: No NER services selected or error in configuration file."
if (
ner_method := os.getenv("PYTHON_NER_METHOD", "transformer").lower()
) not in get_args(ner_methods):
raise ValueError(
f"Please use either {', '.join(get_args(ner_methods))} as an ENV variable defining your ner method."
f"Currently used is {ner_method}"
)
if (doc_attrib := os.getenv("PYTHON_NER_DOC", "text").lower()) not in get_args(
doc_attribs
):
raise ValueError(
f"Please use either {', '.join(get_args(doc_attribs))} as an ENV variable defining your ner document."
f"Currently used is {doc_attrib}"
)
# read configuration
logger.info(
f"NER Pipeline: searching in document attribute {doc_attrib} using {ner_method}"
)
entity_pipeline = EntityPipeline(connect_string)
entity_pipeline.process_documents(entity, doc_attrib, ner_selection)
entity_pipeline.process_documents(doc_attrib=doc_attrib, ner_method=ner_method) # type: ignore
if __name__ == "__main__":
# Establish MongoDB Connection using secrets
execute_ner(JsonFileConfigProvider("./secrets.json"))

View File

@ -1,16 +0,0 @@
{
"ner_service": {
"comment": "Select only one service by setting true and deselect the other with false. Valid doc_attrib: text, title",
"doc_attrib": "text",
"entity": "ORG",
"use_companylist_ner": false,
"use_spacy_ner": false,
"use_transformer_ner": true
},
"sentiment_service": {
"comment": "Select only one service by setting true and deselect the other with false. Valid doc_attrib: text, title",
"doc_attrib": "text",
"use_spacy": false,
"use_transformer": true
}
}

View File

@ -1,7 +1,7 @@
"""Pipeline to get sentiments from Staging DB nes articles."""
import json
import os
from typing import Literal, get_args
from loguru import logger
from tqdm import tqdm
@ -9,9 +9,15 @@ from tqdm import tqdm
import aki_prj23_transparenzregister.utils.mongo.connector as conn
import aki_prj23_transparenzregister.utils.mongo.news_mongo_service as news
from aki_prj23_transparenzregister.ai import sentiment_service
from aki_prj23_transparenzregister.config.config_providers import JsonFileConfigProvider
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
JsonFileConfigProvider,
)
from aki_prj23_transparenzregister.config.config_template import MongoConnection
doc_attribs = Literal["text", "title"]
sentiment_methods = Literal["spacy", "transformer"]
class SentimentPipeline:
"""Class to initialize sentiment Pipeline."""
@ -22,25 +28,34 @@ class SentimentPipeline:
self.connector = conn.MongoConnector(self.connect_string)
self.news_obj = news.MongoNewsService(self.connector)
def process_documents(self, doc_attrib: str, sentiment_selection: str) -> None:
"""Method to check documents, get entities and write them to document."""
CursorUnprogressed = self.news_obj.collection.find( # noqa: N806
def process_documents(
self,
doc_attrib: doc_attribs,
sentiment_method: sentiment_methods,
) -> None:
"""Method to check documents, get entities and write them to document.
Args:
doc_attrib: Defines if headline "title" or body "text" of an article should be used to define its sentiment.
sentiment_method: The method used to make a sentiment analysis.
"""
cursor_unprocessed = self.news_obj.collection.find(
{"sentiment": {"$exists": False}}
)
documents = list(CursorUnprogressed)
documents = list(cursor_unprocessed)
if len(documents) > 0:
for document in tqdm(documents):
text = document[doc_attrib]
# Determine sentiment analysis service based on config
if sentiment_selection == "use_spacy":
if sentiment_method == "spacy":
selected_service = sentiment_service.SentimentAnalysisService(
use_spacy=True, use_transformer=False
)
sentiment_service_func = selected_service.sentiment_spacy
elif sentiment_selection == "use_transformer":
elif sentiment_method == "transformer":
selected_service = sentiment_service.SentimentAnalysisService(
use_spacy=False, use_transformer=True
)
@ -56,34 +71,39 @@ class SentimentPipeline:
logger.info("No documents found.")
if __name__ == "__main__":
# Establish MongoDB Connection using secrets
config_provider = JsonFileConfigProvider("./secrets.json")
connect_string = config_provider.get_mongo_connection_string()
def execute_sentiment(config_provider: ConfigProvider) -> None:
"""Reads entries with missing data from the db and fills them with found sentiments.
# dir of config json
script_dir = os.path.dirname(__file__)
config_file_path = os.path.join(script_dir, "ner_sentiment_config.json")
# Load sentiment service configuration from JSON
with open(config_file_path) as config_file:
sentiment_config = json.load(config_file)
# Where to search the sentiment
doc_attrib = sentiment_config["sentiment_service"]["doc_attrib"]
logger.info("Sentiment Pipeline: searching in document attribute ", str(doc_attrib))
# read selected service
if sentiment_config["sentiment_service"]["use_spacy"] is True:
sentiment_selection = "use_spacy"
logger.info("Sentiment Pipleline: Searching sentiments with spaCy")
elif sentiment_config["sentiment_service"]["use_transformer"] is True:
sentiment_selection = "use_transformer"
logger.info("Sentiment Pipleline: Searching sentiments with transformer")
else:
logger.info(
"Sentiment Pipleline: No Sentiment services selected or error in configuration file."
Args:
config_provider: A config prover to define the MongoDB to read and write.
"""
if (
sentiment_method := os.getenv("PYTHON_SENTIMENT_METHOD", "transformer").lower()
) not in get_args(sentiment_methods):
raise ValueError(
f"Please use either {', '.join(get_args(sentiment_methods))} as an ENV variable defining your sentiment method."
f"Currently used is {sentiment_method}"
)
if (
doc_attrib := os.getenv("PYTHON_SENTIMENT_DOC", "text").lower()
) not in get_args(doc_attribs):
raise ValueError(
f"Please use either {', '.join(get_args(doc_attribs))} as an ENV variable defining your sentiment document."
f"Currently used is {doc_attrib}"
)
sentiment_pipeline = SentimentPipeline(connect_string)
sentiment_pipeline.process_documents(doc_attrib, sentiment_selection)
# read configuration
logger.info(
f"Sentiment Pipeline: searching in document attribute {doc_attrib} using {sentiment_method}"
)
# read selected service
sentiment_pipeline = SentimentPipeline(
config_provider.get_mongo_connection_string()
)
sentiment_pipeline.process_documents(doc_attrib, sentiment_method) # type: ignore
if __name__ == "__main__":
# Establish MongoDB Connection using secrets
execute_sentiment(JsonFileConfigProvider("./secrets.json"))

View File

@ -0,0 +1,56 @@
"""Used to process data from the MongoDB to be ready for use in the SQL db."""
import argparse
import sys
from aki_prj23_transparenzregister.ai.ner_pipeline import execute_ner
from aki_prj23_transparenzregister.ai.sentiment_pipeline import execute_sentiment
from aki_prj23_transparenzregister.config.config_providers import (
HELP_TEXT_CONFIG,
ConfigProvider,
get_config_provider,
)
from aki_prj23_transparenzregister.utils.data_transfer import transfer_data
from aki_prj23_transparenzregister.utils.logger_config import (
add_logger_options_to_argparse,
configer_logger,
)
from aki_prj23_transparenzregister.utils.transfer_news import transfer_news_to_sql
def process_and_transfer_data(
config_provider: ConfigProvider,
) -> None: # pragma: no cover
"""Method to process and transfer all the data that can be found in the MongoDB in the SQL DB.
Args:
config_provider: A config prover to define the MongoDB to read and write.
"""
execute_ner(config_provider)
execute_sentiment(config_provider)
transfer_data(config_provider)
transfer_news_to_sql(config_provider)
def cli() -> None: # pragma: no cover
"""A cli interface for the data transfer."""
parser = argparse.ArgumentParser(
prog="Process and transform data",
description="Process the raw data from the MongoDB with AI models and match and transform the data from the MongoDB when transfering into the SQL DB.",
epilog="Example: 'data-processing secrets.json' or 'data-processing ENV'",
)
parser.add_argument(
"config",
metavar="config",
default="ENV",
help=HELP_TEXT_CONFIG,
)
add_logger_options_to_argparse(parser)
parsed = parser.parse_args(sys.argv[1:])
configer_logger(namespace=parsed)
config = get_config_provider(parsed.config)
process_and_transfer_data(config)
if __name__ == "__main__":
configer_logger(level="info", path="")
process_and_transfer_data(get_config_provider("secrets.json"))

View File

@ -172,7 +172,7 @@ def get_all_news(config_provider: ConfigProvider) -> list:
).get_all()
def transfer_news_to_sql(config_provider: ConfigProvider, db: Session) -> None:
def _transfer_news_to_sql(config_provider: ConfigProvider, db: Session) -> None:
"""Transfers news from the mongodb into the sql db.
Args:
@ -191,6 +191,16 @@ def transfer_news_to_sql(config_provider: ConfigProvider, db: Session) -> None:
)
def transfer_news_to_sql(config_provider: ConfigProvider) -> None:
"""Transfers news from the mongodb into the sql db.
Args:
config_provider: The configuration prover to connect to the mongodb.
db: A session to connect to an SQL db via SQLAlchemy.
"""
_transfer_news_to_sql(config_provider, get_session(config_provider))
if __name__ == "__main__":
jconfig_provider = JsonFileConfigProvider("secrets2.json")
transfer_news_to_sql(jconfig_provider, get_session(jconfig_provider))
transfer_news_to_sql(jconfig_provider)

View File

@ -83,9 +83,7 @@ def test_entity_pipeline_with_spacy(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with spaCy NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_spacy_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="spacy")
# Ensure that ner_spacy was called with the correct parameters
mock_ner_spacy.assert_called_once_with(mock_documents[0], "ORG", "title")
@ -121,9 +119,7 @@ def test_entity_pipeline_with_spacy_no_docs(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with spaCy NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_spacy_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="spacy")
# Ensure that sentiment_spacy was not called
mock_ner_spacy.assert_not_called()
@ -135,14 +131,14 @@ def test_entity_pipeline_with_spacy_no_docs(
@patch(
"aki_prj23_transparenzregister.ai.ner_service.NerAnalysisService.ner_company_list"
)
def test_entity_pipeline_with_companylist_ner(
mock_ner_companylist: Mock,
def test_entity_pipeline_with_company_list_ner(
mock_ner_company_list: Mock,
mock_mongo_connector: Mock,
mock_mongo_connection: MongoConnection,
mock_spacy: Mock,
) -> None:
# Konfigurieren Sie das Mock-Objekt, um ein spezifisches NER-Ergebnis zurückzugeben
mock_ner_companylist.return_value = {"ORG": 3, "LOCATION": 2}
mock_ner_company_list.return_value = {"ORG": 3, "LOCATION": 2}
# Create an instance of the EntityPipeline
entity_pipeline = EntityPipeline(mock_mongo_connection)
@ -159,12 +155,10 @@ def test_entity_pipeline_with_companylist_ner(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with Company List NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_companylist_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="company_list")
# Überprüfen Sie, ob ner_company_list mit den richtigen Parametern aufgerufen wurde
mock_ner_companylist.assert_called_once_with(mock_documents[0], "ORG", "title")
mock_ner_company_list.assert_called_once_with(mock_documents[0], "ORG", "title")
# Überprüfen Sie, ob das Dokument in der Sammlung mit den NER-Ergebnissen aktualisiert wurde
mock_collection.update_one.assert_called_once_with(
@ -176,14 +170,14 @@ def test_entity_pipeline_with_companylist_ner(
@patch(
"aki_prj23_transparenzregister.ai.ner_service.NerAnalysisService.ner_company_list"
)
def test_entity_pipeline_with_companylist_ner_no_docs(
mock_ner_companylist: Mock,
def test_entity_pipeline_with_company_list_ner_no_docs(
mock_ner_company_list: Mock,
mock_mongo_connector: Mock,
mock_mongo_connection: MongoConnection,
mock_spacy: Mock,
) -> None:
# Configure the mock to return a specific NER result
mock_ner_companylist.return_value = {"ORG": 3, "LOCATION": 2}
mock_ner_company_list.return_value = {"ORG": 3, "LOCATION": 2}
# Create an instance of the EntityPipeline
entity_pipeline = EntityPipeline(mock_mongo_connection)
@ -198,18 +192,15 @@ def test_entity_pipeline_with_companylist_ner_no_docs(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with Company List NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_companylist_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="company_list")
# Ensure that ner_company_list is not called
mock_ner_companylist.assert_not_called()
mock_ner_company_list.assert_not_called()
# Ensure that the document in the collection was not updated
mock_collection.update_one.assert_not_called()
# Add more test cases for other NER methods (e.g., use_companylist_ner, use_transformer_ner) following a similar pattern.
@patch("aki_prj23_transparenzregister.ai.ner_service.NerAnalysisService.ner_spacy")
def test_entity_pipeline_with_transformer(
mock_ner_transformer: Mock,
@ -234,9 +225,7 @@ def test_entity_pipeline_with_transformer(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with spaCy NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_spacy_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="spacy")
# Ensure that ner_spacy was called with the correct parameters
mock_ner_transformer.assert_called_once_with(mock_documents[0], "ORG", "title")
@ -272,9 +261,7 @@ def test_entity_pipeline_with_transformer_no_docs(
mock_collection.find.return_value = mock_documents
# Call the process_documents method with spaCy NER
entity_pipeline.process_documents(
entity="ORG", doc_attrib="title", ner_selection="use_spacy_ner"
)
entity_pipeline.process_documents(doc_attrib="title", ner_method="spacy")
# Ensure that ner_transformer is not called
mock_ner_transformer.assert_not_called()

View File

@ -92,7 +92,7 @@ def test_sentiment_pipeline_existing_sentiment(
mock_collection.find.return_value = mock_documents
# Call the process_documents method
sentiment_pipeline.process_documents("text", "use_spacy")
sentiment_pipeline.process_documents("text", "spacy")
# Ensure that sentiment_spacy was called with the correct text
mock_sentiment_spacy.assert_called_once_with("This is a positive text.")
@ -124,7 +124,7 @@ def test_sentiment_pipeline_no_documents(
sentiment_pipeline.news_obj.collection = mock_collection
# Call the process_documents method
sentiment_pipeline.process_documents("text", "use_spacy")
sentiment_pipeline.process_documents("text", "spacy")
# Ensure that sentiment_spacy was not called
mock_sentiment_spacy.assert_not_called()
@ -159,7 +159,7 @@ def test_sentiment_pipeline_with_spacy(
mock_collection.find.return_value = mock_documents
# Call the process_documents method
sentiment_pipeline.process_documents("text", "use_spacy")
sentiment_pipeline.process_documents("text", "spacy")
# Ensure that sentiment_spacy was called with the correct text
mock_sentiment_spacy.assert_called_once_with("This is a positive text.")
@ -198,7 +198,7 @@ def test_sentiment_pipeline_with_transformer(
mock_collection.find.return_value = mock_documents
# Call the process_documents method
sentiment_pipeline.process_documents("text", "use_transformer")
sentiment_pipeline.process_documents("text", "transformer")
# Ensure that sentiment_transformer was called with the correct text
mock_sentiment_transformer.assert_called_once_with("This is a negative text.")

View File

@ -0,0 +1,7 @@
"""Tests for the data processing module."""
from aki_prj23_transparenzregister.utils import data_processing
def test_import() -> None:
"""Tests if the data processing module can be imported."""
assert data_processing

View File

@ -130,7 +130,7 @@ def test_transfer_news_to_sql(full_db: Session, monkeypatch: MonkeyPatch) -> Non
"aki_prj23_transparenzregister.utils.transfer_news.get_all_news",
lambda _: NEWS_TEXTS,
)
transfer_news.transfer_news_to_sql(None, full_db) # type: ignore
transfer_news._transfer_news_to_sql(None, full_db) # type: ignore
articles = pd.read_sql_table(entities.News.__tablename__, full_db.bind) # type: ignore
assert "text" in articles.columns
del articles["text"]