multi-thread company financial ingestion

This commit is contained in:
TrisNol 2023-08-29 18:38:25 +02:00
parent ac07bd8e8e
commit 7a2bc5db2b
3 changed files with 74 additions and 33 deletions

View File

@ -32,11 +32,31 @@ def work(company: typing.Any, company_service: CompanyMongoService) -> None:
if __name__ == "__main__":
import concurrent.futures
from tqdm import tqdm
config_provider = JsonFileConfigProvider("./secrets.json")
mongo_connector = MongoConnector(config_provider.get_mongo_connection_string())
company_service = CompanyMongoService(mongo_connector)
companies = company_service.get_all()
for company in companies:
work(company, company_service)
num_threads = 25
companies = company_service.get_where_no_financial_results()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# Submit tasks for each entry in the list
future_to_entry = {
executor.submit(work, entry, company_service): entry for entry in companies
}
with tqdm(total=len(companies)) as pbar:
# Wait for all tasks to complete
for future in concurrent.futures.as_completed(future_to_entry):
entry = future_to_entry[future]
# try:
# Get the result of the completed task (if needed)
result = future.result()
pbar.set_description(entry["name"])
pbar.update(1)
# except Exception as e:
# print(f"Error processing entry {entry}: {e}")

View File

@ -8,6 +8,8 @@ from deutschland.bundesanzeiger import Bundesanzeiger as Ba
from aki_prj23_transparenzregister.models.auditor import Auditor
from aki_prj23_transparenzregister.models.company import FinancialKPIEnum
pd.options.mode.chained_assignment = ""
class Bundesanzeiger:
"""Bundesanzeiger wrapper to export relevant information."""
@ -115,22 +117,22 @@ class Bundesanzeiger:
# Define KPI patterns to search for
kpi_patterns = {
FinancialKPIEnum.REVENUE: r"(?:revenue|umsatz|erlöse)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.NET_INCOME: r"(?:net income|jahresüberschuss|nettoeinkommen|Ergebnis nach Steuern)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBIT: r"(?:ebit|operating income)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBITDA: r"(?:ebitda)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.GROSS_PROFIT: r"(?:gross profit|bruttogewinn)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.OPERATING_PROFIT: r"(?:operating profit|betriebsgewinn)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.ASSETS: r"(?:total assets|bilanzsumme)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.LIABILITIES: r"(?:total liabilities|gesamtverbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EQUITY: r"(?:shareholders'? equity|eigenkapital)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CURRENT_ASSETS: r"(?:current assets|umlaufvermögen)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CURRENT_LIABILITIES: r"(?:current liabilities|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.LONG_TERM_DEBT: r"(?:long[-\s]?term debt|langfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.SHORT_TERM_DEBT: r"(?:short[-\s]?term debt|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CASH_AND_CASH_EQUIVALENTS: r"(?:cash (?:and cash equivalents)?|barmittel)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.DIVIDENDS: r"(?:dividends?|dividende)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CASH_FLOW: r"(?:cash flow|cashflow|cash flow from operating activities)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.REVENUE.value: r"(?:revenue|umsatz|erlöse)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.NET_INCOME.value: r"(?:net income|jahresüberschuss|nettoeinkommen|Ergebnis nach Steuern)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBIT.value: r"(?:ebit|operating income)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBITDA.value: r"(?:ebitda)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.GROSS_PROFIT.value: r"(?:gross profit|bruttogewinn)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.OPERATING_PROFIT.value: r"(?:operating profit|betriebsgewinn)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.ASSETS.value: r"(?:total assets|bilanzsumme)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.LIABILITIES.value: r"(?:total liabilities|gesamtverbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EQUITY.value: r"(?:shareholders'? equity|eigenkapital)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CURRENT_ASSETS.value: r"(?:current assets|umlaufvermögen)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CURRENT_LIABILITIES.value: r"(?:current liabilities|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.LONG_TERM_DEBT.value: r"(?:long[-\s]?term debt|langfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.SHORT_TERM_DEBT.value: r"(?:short[-\s]?term debt|kurzfristige verbindlichkeiten)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CASH_AND_CASH_EQUIVALENTS.value: r"(?:cash (?:and cash equivalents)?|barmittel)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.DIVIDENDS.value: r"(?:dividends?|dividende)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.CASH_FLOW.value: r"(?:cash flow|cashflow|cash flow from operating activities)[:\s]*([\d,.]+[mmb]?)",
}
for kpi, pattern in kpi_patterns.items():

View File

@ -1,4 +1,6 @@
"""CompanyMongoService."""
from threading import Lock
from bson.objectid import ObjectId
from pymongo.results import InsertOneResult, UpdateResult
@ -16,6 +18,7 @@ class CompanyMongoService:
connector (MongoConnector): _description_
"""
self.collection = connector.database["companies"]
self.lock = Lock() # Create a lock for synchronization
def get_all(self) -> list[Company]:
"""_summary_.
@ -23,8 +26,9 @@ class CompanyMongoService:
Returns:
list[Company]: _description_
"""
result = self.collection.find()
return list(result)
with self.lock:
result = self.collection.find()
return list(result)
def get_by_id(self, id: CompanyID) -> Company | None:
"""_summary_.
@ -35,10 +39,11 @@ class CompanyMongoService:
Returns:
Company | None: _description_
"""
result = list(self.collection.find({"id": id}))
if len(result) == 1:
return result[0]
return None
with self.lock:
result = list(self.collection.find({"id": id}))
if len(result) == 1:
return result[0]
return None
def get_by_object_id(self, _id: str) -> dict | None:
"""Find an object by given _id.
@ -49,10 +54,22 @@ class CompanyMongoService:
Returns:
Company | None: Entry if found, otherwise None
"""
result = list(self.collection.find({"_id": ObjectId(_id)}))
if len(result) == 1:
return result[0]
return None
with self.lock:
result = list(self.collection.find({"_id": ObjectId(_id)}))
if len(result) == 1:
return result[0]
return None
def get_where_no_financial_results(self) -> list[dict]:
"""Get all entries that have no yearly_results.
Returns:
list[dict]: List of companies found
"""
with self.lock:
return list(
self.collection.find({"$or": [{"yearly_results": {"$exists": False}}]})
)
def insert(self, company: Company) -> InsertOneResult:
"""_summary_.
@ -63,7 +80,8 @@ class CompanyMongoService:
Returns:
_type_: _description_
"""
return self.collection.insert_one(company.to_dict())
with self.lock:
return self.collection.insert_one(company.to_dict())
def add_yearly_results(self, _id: str, yearly_results: dict) -> UpdateResult:
"""Add the `yearly_results` field to a Company entry.
@ -75,6 +93,7 @@ class CompanyMongoService:
Returns:
UpdateResult: Result
"""
return self.collection.update_one(
{"_id": ObjectId(_id)}, {"$set": {"yearly_results": yearly_results}}
)
with self.lock:
return self.collection.update_one(
{"_id": ObjectId(_id)}, {"$set": {"yearly_results": yearly_results}}
)