feat(data-extraction): MongoWrapper, DataClasses and services for News and Company data

This commit is contained in:
TrisNol 2023-07-10 18:58:31 +02:00
parent 4c65d37816
commit 4c95550dbf
17 changed files with 384 additions and 135 deletions

View File

@ -1,13 +0,0 @@
from dataclasses import asdict, dataclass
@dataclass
class News:
id: str
title: str
date: str
text: str
source_url: str
def dict(self):
return asdict(self)

View File

@ -1,17 +0,0 @@
from abc import ABC
from News.models.News import News
class NewsServiceInterface(ABC):
def get_all(self) -> list[News]:
raise NotImplementedError
def get_by_id(self, id: str) -> News | None:
raise NotImplementedError
def insert(self, news: News):
raise NotImplementedError
def insert_many(self, news: list[News]):
raise NotImplementedError

View File

@ -1,67 +0,0 @@
import pymongo
from News.models.News import News
from News.utils.NewsServiceInterface import NewsServiceInterface
class MongoConnector:
def __init__(
self,
hostname,
database: str,
port: int | None,
username: str | None = None,
password: str | None = None,
):
self.client = self.connect(hostname, port, username, password)
databases = self.client.list_database_names()
if database not in databases:
print(f"Database {database} will be created")
self.database = self.client[database]
def connect(self, hostname, port, username, password) -> pymongo.MongoClient:
if username is not None and password is not None:
connection_string = f"mongodb+srv://{username}:{password}@{hostname}"
else:
connection_string = f"mongodb+srv://{hostname}"
if port is not None:
connection_string += f":{port}"
connection_string = connection_string.replace("mongodb+srv", "mongodb")
print(connection_string)
return pymongo.MongoClient(connection_string)
class MongoNewsService(NewsServiceInterface):
def __init__(self, connector: MongoConnector):
self.collection = connector.database["news"]
def get_all(self) -> list[News]:
result = self.collection.find()
return [MongoEntryTransformer.transform_outgoing(elem) for elem in result]
def get_by_id(self, id: str) -> News | None:
result = list(self.collection.find({"_id": id}))
if len(result) == 1:
return MongoEntryTransformer.transform_outgoing(list(result)[0])
return None
def insert(self, news: News):
return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news))
class MongoEntryTransformer:
@staticmethod
def transform_ingoing(news: News) -> dict:
transport_object = news.dict()
transport_object["_id"] = news.id
del transport_object["id"]
return transport_object
@staticmethod
def transform_outgoing(data: dict) -> News:
return News(
id=data["_id"],
title=data["title"],
date=data["date"],
text=data["text"],
source_url=data["source_url"],
)

View File

@ -1,21 +0,0 @@
from News.utils.mongodb.mongo import MongoConnector
from Unternehmensregister.models.Company import Company
from Unternehmensregister.utils.CompanyServiceInterface import CompanyServiceInterface
class CompanyMongoService(CompanyServiceInterface):
def __init__(self, connector: MongoConnector):
self.collection = connector.database["companies"]
def get_all(self) -> list[Company]:
result = self.collection.find()
return list(result)
def get_by_id(self, id: str) -> Company | None:
result = list(self.collection.find({"id": id}))
if len(result) == 1:
return result[0]
return None
def insert(self, company: Company):
return self.collection.insert_one(company.dict())

View File

@ -1,14 +0,0 @@
from abc import ABC
from models import Company
class CompanyServiceInterface(ABC):
def get_all(self) -> list[Company.Company]:
raise NotImplementedError()
def get_by_id(self, id: Company.CompayID) -> Company.Company | None:
raise NotImplementedError()
def insert(self, company: Company.Company):
raise NotImplementedError()

View File

@ -100,6 +100,9 @@ target-version = "py311"
# Avoid trying to fix flake8-bugbear (`B`) violations.
unfixable = ["B"]
[tool.ruff.flake8-builtins]
builtins-ignorelist = ["id"]
[tool.ruff.per-file-ignores]
"tests/*.py" = ["S101"]

View File

@ -0,0 +1 @@
"""Model classes."""

View File

@ -1,21 +1,32 @@
"""Company model."""
from abc import ABC
from dataclasses import asdict, dataclass
from enum import Enum
class RelationshipRoleEnum(Enum):
"""_summary_.
Args:
Enum (_type_): _description_
"""
STAKEHOLDER = ""
ORGANISATION = "ORGANISATION"
@dataclass
class CompayID:
class CompanyID:
"""_summary_."""
district_court: str
hr_number: str
@dataclass
class Location:
"""_summary_."""
city: str
street: str | None = None
house_number: str | None = None
@ -24,17 +35,34 @@ class Location:
@dataclass
class CompanyRelationship(ABC):
"""_summary_.
Args:
ABC (_type_): _description_
"""
role: RelationshipRoleEnum
location: Location
@dataclass
class Company:
id: CompayID
"""_summary_.
Returns:
_type_: _description_
"""
id: CompanyID
location: Location
name: str
last_update: str
relationships: list[CompanyRelationship]
def dict(self):
def to_dict(self) -> dict:
"""_summary_.
Returns:
dict: _description_
"""
return asdict(self)

View File

@ -0,0 +1,25 @@
"""News mnodel."""
from dataclasses import asdict, dataclass
@dataclass
class News:
"""_summary_.
Returns:
_type_: _description_
"""
id: str
title: str
date: str
text: str
source_url: str
def to_dict(self) -> dict:
"""_summary_.
Returns:
dict: _description_
"""
return asdict(self)

View File

@ -0,0 +1 @@
"""Util classes and services."""

View File

@ -0,0 +1,54 @@
"""CompanyMongoService."""
from models.company import Company
from utils.company_service_interface import CompanyServiceInterface
from utils.mongo import MongoConnector
class CompanyMongoService(CompanyServiceInterface):
"""_summary_.
Args:
CompanyServiceInterface (_type_): _description_
"""
def __init__(self, connector: MongoConnector):
"""_summary_.
Args:
connector (MongoConnector): _description_
"""
self.collection = connector.database["companies"]
def get_all(self) -> list[Company]:
"""_summary_.
Returns:
list[Company]: _description_
"""
result = self.collection.find()
return list(result)
def get_by_id(self, id: str) -> Company | None:
"""_summary_.
Args:
id (str): _description_
Returns:
Company | None: _description_
"""
result = list(self.collection.find({"id": id}))
if len(result) == 1:
return result[0]
return None
def insert(self, company: Company):
"""_summary_.
Args:
company (Company): _description_
Returns:
_type_: _description_
"""
return self.collection.insert_one(company.to_dict())

View File

@ -0,0 +1,51 @@
"""CompanyServiceInterface."""
import abc
from models import Company
class CompanyServiceInterface(abc.ABC):
"""Generic abstract interface for CRUD operations of a Company.
Args:
ABC (_type_): Abstract class
"""
@abc.abstractmethod
def get_all(self) -> list[Company.Company]:
"""_summary_.
Raises:
NotImplementedError: _description_
Returns:
list[Company.Company]: _description_
"""
raise NotImplementedError
@abc.abstractmethod
def get_by_id(self, id: Company.CompayID) -> Company.Company | None:
"""_summary_.
Args:
id (Company.CompayID): _description_
Raises:
NotImplementedError: _description_
Returns:
Company.Company | None: _description_
"""
raise NotImplementedError
@abc.abstractmethod
def insert(self, company: Company.Company):
"""_summary_.
Args:
company (Company.Company): _description_
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError

View File

@ -0,0 +1,155 @@
"""Mongo Wrapper."""
from dataclasses import dataclass
import pymongo
from models.news import News
from utils.news_service_interface import (
NewsServiceInterface,
)
@dataclass
class MongoConnection:
"""_summary_."""
hostname: str
database: str
port: int | None
username: str | None
password: str | None
class MongoConnector:
"""Wrapper for establishing a connection to a MongoDB instance."""
def __init__(self, connection: MongoConnection):
"""_summary_.
Args:
connection (MongoConnection): Wrapper for connection string
"""
self.client = self.connect(
connection.hostname,
connection.port,
connection.username,
connection.password,
)
self.database = self.client[connection.database]
def connect(
self,
hostname: str,
port: int | None,
username: str | None,
password: str | None,
) -> pymongo.MongoClient:
"""_summary_.
Args:
hostname (str): hostname
port (int): port
username (str): Username
password (str): Password
Returns:
pymongo.MongoClient: MongoClient connect to the DB
"""
if username is not None and password is not None:
connection_string = f"mongodb+srv://{username}:{password}@{hostname}"
else:
connection_string = f"mongodb+srv://{hostname}"
if port is not None:
connection_string += f":{port}"
connection_string = connection_string.replace("mongodb+srv", "mongodb")
return pymongo.MongoClient(connection_string)
class MongoNewsService(NewsServiceInterface):
"""_summary_.
Args:
NewsServiceInterface (_type_): _description_
"""
def __init__(self, connector: MongoConnector):
"""_summary_.
Args:
connector (MongoConnector): _description_
"""
self.collection = connector.database["news"]
def get_all(self) -> list[News]:
"""_summary_.
Returns:
list[News]: _description_
"""
result = self.collection.find()
return [MongoEntryTransformer.transform_outgoing(elem) for elem in result]
def get_by_id(self, id: str) -> News | None:
"""_summary_.
Args:
id (str): _description_
Returns:
News | None: _description_
"""
result = list(self.collection.find({"_id": id}))
if len(result) == 1:
return MongoEntryTransformer.transform_outgoing(list(result)[0])
return None
def insert(self, news: News):
"""_summary_.
Args:
news (News): _description_
Returns:
_type_: _description_
"""
return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news))
class MongoEntryTransformer:
"""_summary_.
Returns:
_type_: _description_
"""
@staticmethod
def transform_ingoing(news: News) -> dict:
"""Convert a News object to a dictionary compatible with a MongoDB entry.
Args:
news (News): News object to be transformed
Returns:
dict: Transformed data with added _id field
"""
transport_object = news.to_dict()
transport_object["_id"] = news.id
del transport_object["id"]
return transport_object
@staticmethod
def transform_outgoing(data: dict) -> News:
"""Reverse the transform_ingoing method.
Args:
data (dict): dict from the MongoDB to be transformed
Returns:
News: News entry based on MongoDB document
"""
return News(
id=data["_id"],
title=data["title"],
date=data["date"],
text=data["text"],
source_url=data["source_url"],
)

View File

@ -0,0 +1,63 @@
"""NewsServiceInterface."""
import abc
from models.news import News
class NewsServiceInterface(abc.ABC):
"""Generic abstract interface for a NewsService handling CRUD operations.
Args:
ABC (_type_): Abstract class
"""
@abc.abstractmethod
def get_all(self) -> list[News]:
"""Get a list of all News articles.
Raises:
NotImplementedError: To be defined by child classes
Returns:
list[News]: Results
"""
raise NotImplementedError
@abc.abstractmethod
def get_by_id(self, id: str) -> News | None:
"""Get an entry by an ID.
Args:
id (str): ID identifying the entry
Raises:
NotImplementedError: To be defined by child classes
Returns:
News | None: Found object or None if no entry with ID found
"""
raise NotImplementedError
@abc.abstractmethod
def insert(self, news: News):
"""Insert a News entry into the DB.
Args:
news (News): News object to be saved
Raises:
NotImplementedError: To be defined by child classes
"""
raise NotImplementedError
@abc.abstractmethod
def insert_many(self, news: list[News]):
"""Inserts many documents at once.
Args:
news (list[News]): List of News entries to be saved
Raises:
NotImplementedError: To be defined by child classes
"""
raise NotImplementedError