import pymongo from models.News import News from utils.NewsServiceInterface import NewsServiceInterface class MongoConnector: def __init__( self, hostname, database: str, port: int = 27017, username: str | None = None, password: str | None = None, ): self.client = self.connect(hostname, port, username, password) databases = self.client.list_database_names() if database not in databases: print(f"Database {database} will be created") self.database = self.client[database] def connect(self, hostname, port, username, password) -> pymongo.MongoClient: if username is not None and password is not None: connection_string = f"mongodb://{username}:{password}@{hostname}:{port}" else: connection_string = f"mongodb://{hostname}:{port}" return pymongo.MongoClient(connection_string) class MongoNewsService(NewsServiceInterface): def __init__(self, connector: MongoConnector): self.collection = connector.database["news"] def get_all(self) -> list[News]: result = self.collection.find() return [MongoEntryTransformer.transform_outgoing(elem) for elem in result] def get_by_id(self, id: str) -> News | None: result = list(self.collection.find({"_id": id})) if len(result) == 1: return MongoEntryTransformer.transform_outgoing(list(result)[0]) return None def insert(self, news: News): return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news)) class MongoEntryTransformer: @staticmethod def transform_ingoing(news: News) -> dict: transport_object = news.dict() print(transport_object) transport_object["_id"] = news.id del transport_object["id"] return transport_object @staticmethod def transform_outgoing(data: dict) -> News: return News( id=data["_id"], title=data["title"], date=data["date"], text=data["text"], source_url=data["source_url"], )