diff --git a/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py b/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py index 0667a87..156c71e 100644 --- a/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py +++ b/src/aki_prj23_transparenzregister/apps/enrich_company_financials.py @@ -32,11 +32,31 @@ def work(company: typing.Any, company_service: CompanyMongoService) -> None: if __name__ == "__main__": + import concurrent.futures + + from tqdm import tqdm + config_provider = JsonFileConfigProvider("./secrets.json") mongo_connector = MongoConnector(config_provider.get_mongo_connection_string()) company_service = CompanyMongoService(mongo_connector) - companies = company_service.get_all() - for company in companies: - work(company, company_service) + num_threads = 25 + companies = company_service.get_where_no_financial_results() + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + # Submit tasks for each entry in the list + future_to_entry = { + executor.submit(work, entry, company_service): entry for entry in companies + } + + with tqdm(total=len(companies)) as pbar: + # Wait for all tasks to complete + for future in concurrent.futures.as_completed(future_to_entry): + entry = future_to_entry[future] + # try: + # Get the result of the completed task (if needed) + result = future.result() + pbar.set_description(entry["name"]) + pbar.update(1) + # except Exception as e: + # print(f"Error processing entry {entry}: {e}") diff --git a/src/aki_prj23_transparenzregister/utils/data_extraction/bundesanzeiger.py b/src/aki_prj23_transparenzregister/utils/data_extraction/bundesanzeiger.py index c2c28d7..ae81308 100644 --- a/src/aki_prj23_transparenzregister/utils/data_extraction/bundesanzeiger.py +++ b/src/aki_prj23_transparenzregister/utils/data_extraction/bundesanzeiger.py @@ -8,6 +8,8 @@ from deutschland.bundesanzeiger import Bundesanzeiger as Ba from aki_prj23_transparenzregister.models.auditor import Auditor from aki_prj23_transparenzregister.models.company import FinancialKPIEnum +pd.options.mode.chained_assignment = "" + class Bundesanzeiger: """Bundesanzeiger wrapper to export relevant information.""" @@ -115,22 +117,22 @@ class Bundesanzeiger: # Define KPI patterns to search for kpi_patterns = { - FinancialKPIEnum.REVENUE: r"(?:revenue|umsatz|erlöse)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.NET_INCOME: r"(?:net income|jahresüberschuss|nettoeinkommen|Ergebnis nach Steuern)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.EBIT: r"(?:ebit|operating income)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.EBITDA: r"(?:ebitda)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.GROSS_PROFIT: r"(?:gross profit|bruttogewinn)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.OPERATING_PROFIT: r"(?:operating profit|betriebsgewinn)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.ASSETS: r"(?:total assets|bilanzsumme)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.LIABILITIES: r"(?:total liabilities|gesamtverbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.EQUITY: r"(?:shareholders'? equity|eigenkapital)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.CURRENT_ASSETS: r"(?:current assets|umlaufvermögen)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.CURRENT_LIABILITIES: r"(?:current liabilities|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.LONG_TERM_DEBT: r"(?:long[-\s]?term debt|langfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.SHORT_TERM_DEBT: r"(?:short[-\s]?term debt|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.CASH_AND_CASH_EQUIVALENTS: r"(?:cash (?:and cash equivalents)?|barmittel)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.DIVIDENDS: r"(?:dividends?|dividende)[:\s]*([\d,.]+[mmb]?)", - FinancialKPIEnum.CASH_FLOW: r"(?:cash flow|cashflow|cash flow from operating activities)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.REVENUE.value: r"(?:revenue|umsatz|erlöse)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.NET_INCOME.value: r"(?:net income|jahresüberschuss|nettoeinkommen|Ergebnis nach Steuern)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.EBIT.value: r"(?:ebit|operating income)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.EBITDA.value: r"(?:ebitda)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.GROSS_PROFIT.value: r"(?:gross profit|bruttogewinn)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.OPERATING_PROFIT.value: r"(?:operating profit|betriebsgewinn)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.ASSETS.value: r"(?:total assets|bilanzsumme)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.LIABILITIES.value: r"(?:total liabilities|gesamtverbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.EQUITY.value: r"(?:shareholders'? equity|eigenkapital)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.CURRENT_ASSETS.value: r"(?:current assets|umlaufvermögen)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.CURRENT_LIABILITIES.value: r"(?:current liabilities|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.LONG_TERM_DEBT.value: r"(?:long[-\s]?term debt|langfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.SHORT_TERM_DEBT.value: r"(?:short[-\s]?term debt|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.CASH_AND_CASH_EQUIVALENTS.value: r"(?:cash (?:and cash equivalents)?|barmittel)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.DIVIDENDS.value: r"(?:dividends?|dividende)[:\s]*([\d,.]+[mmb]?)", + FinancialKPIEnum.CASH_FLOW.value: r"(?:cash flow|cashflow|cash flow from operating activities)[:\s]*([\d,.]+[mmb]?)", } for kpi, pattern in kpi_patterns.items(): diff --git a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py index ae8bac2..cce8a25 100644 --- a/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py +++ b/src/aki_prj23_transparenzregister/utils/mongo/company_mongo_service.py @@ -1,4 +1,6 @@ """CompanyMongoService.""" +from threading import Lock + from bson.objectid import ObjectId from pymongo.results import InsertOneResult, UpdateResult @@ -16,6 +18,7 @@ class CompanyMongoService: connector (MongoConnector): _description_ """ self.collection = connector.database["companies"] + self.lock = Lock() # Create a lock for synchronization def get_all(self) -> list[Company]: """_summary_. @@ -23,8 +26,9 @@ class CompanyMongoService: Returns: list[Company]: _description_ """ - result = self.collection.find() - return list(result) + with self.lock: + result = self.collection.find() + return list(result) def get_by_id(self, id: CompanyID) -> Company | None: """_summary_. @@ -35,10 +39,11 @@ class CompanyMongoService: Returns: Company | None: _description_ """ - result = list(self.collection.find({"id": id})) - if len(result) == 1: - return result[0] - return None + with self.lock: + result = list(self.collection.find({"id": id})) + if len(result) == 1: + return result[0] + return None def get_by_object_id(self, _id: str) -> dict | None: """Find an object by given _id. @@ -49,10 +54,22 @@ class CompanyMongoService: Returns: Company | None: Entry if found, otherwise None """ - result = list(self.collection.find({"_id": ObjectId(_id)})) - if len(result) == 1: - return result[0] - return None + with self.lock: + result = list(self.collection.find({"_id": ObjectId(_id)})) + if len(result) == 1: + return result[0] + return None + + def get_where_no_financial_results(self) -> list[dict]: + """Get all entries that have no yearly_results. + + Returns: + list[dict]: List of companies found + """ + with self.lock: + return list( + self.collection.find({"$or": [{"yearly_results": {"$exists": False}}]}) + ) def insert(self, company: Company) -> InsertOneResult: """_summary_. @@ -63,7 +80,8 @@ class CompanyMongoService: Returns: _type_: _description_ """ - return self.collection.insert_one(company.to_dict()) + with self.lock: + return self.collection.insert_one(company.to_dict()) def add_yearly_results(self, _id: str, yearly_results: dict) -> UpdateResult: """Add the `yearly_results` field to a Company entry. @@ -75,6 +93,7 @@ class CompanyMongoService: Returns: UpdateResult: Result """ - return self.collection.update_one( - {"_id": ObjectId(_id)}, {"$set": {"yearly_results": yearly_results}} - ) + with self.lock: + return self.collection.update_one( + {"_id": ObjectId(_id)}, {"$set": {"yearly_results": yearly_results}} + )