import pymongo from News.models.News import News from News.utils.NewsServiceInterface import NewsServiceInterface class MongoConnector: def __init__( self, hostname, database: str, port: int | None, username: str | None = None, password: str | None = None, ): self.client = self.connect(hostname, port, username, password) databases = self.client.list_database_names() if database not in databases: print(f"Database {database} will be created") self.database = self.client[database] def connect(self, hostname, port, username, password) -> pymongo.MongoClient: if username is not None and password is not None: connection_string = f"mongodb+srv://{username}:{password}@{hostname}" else: connection_string = f"mongodb+srv://{hostname}" if port is not None: connection_string += f":{port}" connection_string = connection_string.replace("mongodb+srv", "mongodb") print(connection_string) return pymongo.MongoClient(connection_string) class MongoNewsService(NewsServiceInterface): def __init__(self, connector: MongoConnector): self.collection = connector.database["news"] def get_all(self) -> list[News]: result = self.collection.find() return [MongoEntryTransformer.transform_outgoing(elem) for elem in result] def get_by_id(self, id: str) -> News | None: result = list(self.collection.find({"_id": id})) if len(result) == 1: return MongoEntryTransformer.transform_outgoing(list(result)[0]) return None def insert(self, news: News): return self.collection.insert_one(MongoEntryTransformer.transform_ingoing(news)) class MongoEntryTransformer: @staticmethod def transform_ingoing(news: News) -> dict: transport_object = news.dict() transport_object["_id"] = news.id del transport_object["id"] return transport_object @staticmethod def transform_outgoing(data: dict) -> News: return News( id=data["_id"], title=data["title"], date=data["date"], text=data["text"], source_url=data["source_url"], )