feat(data-extraction): Provide KPI table analysis in bundesanzeiger wrapper

This commit is contained in:
TrisNol 2023-10-20 15:56:52 +02:00
parent 815e08a8f1
commit f8a0d58314
2 changed files with 1150 additions and 40 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,8 @@
"""Fetch data from Bundesanzeiger.""" """Fetch data from Bundesanzeiger."""
import re import re
from io import StringIO
import numpy as np
import pandas as pd import pandas as pd
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from deutschland.bundesanzeiger import Bundesanzeiger as Ba from deutschland.bundesanzeiger import Bundesanzeiger as Ba
@ -49,8 +51,11 @@ class Bundesanzeiger:
df_data["auditors"] = audits df_data["auditors"] = audits
# Add Financial information # Add Financial information
# df_data["financial_results"] = df_data.raw_report.apply(
# self.extract_financial_results
# )
df_data["financial_results"] = df_data.raw_report.apply( df_data["financial_results"] = df_data.raw_report.apply(
self.extract_financial_results self.parse_tables_to_kpis
) )
# Remove irrelevant columns # Remove irrelevant columns
@ -190,6 +195,110 @@ class Bundesanzeiger:
) )
return self.__extract_kpis__(report_parsed) return self.__extract_kpis__(report_parsed)
def __extract_tables_from_report__(self, report: str) -> list[pd.DataFrame]:
result = []
soup = BeautifulSoup(report, features="html.parser")
for table in soup.find_all("table", {"class": "std_table"}):
try:
results = pd.read_html(StringIO(str(table)), flavor="bs4")
if len(results) > 0:
data_frame = results[0]
result.append(data_frame)
# ruff: noqa: S112
except Exception:
continue
return result
# ruff: noqa: PLR0912
def parse_tables_to_kpis(self, report: str) -> dict:
"""Extract KPIs from tables included in a report.
Args:
report (str): Raw report
Returns:
dict: Extracted KPIs
"""
kpis = {}
tables = self.__extract_tables_from_report__(report)
for table in tables:
def cleanse_string(value: str) -> str | None:
if value is not None and isinstance(value, str):
return re.sub(r"(.+\.).", "", value)
return None
def parse_string_to_float(value: str | float) -> float | None:
try:
if value is None:
return None
return float(str(value).replace(".", "").replace(",", "."))
except Exception:
return None
def apply_factor(value: str, factor: float) -> float | None:
transformed_value = parse_string_to_float(value)
if transformed_value is None or isinstance(transformed_value, str):
return None
return transformed_value * factor
for index, row in table.iterrows():
table.iloc[index][0] = cleanse_string(row.iloc[0]) # type: ignore
converter = {
"Mio€": 1 * 10**6,
"Mio": 1 * 10**6,
"T€": 1 * 10**3,
"TEUR": 1 * 10**3,
"EUR": 1,
"": 1,
}
for column in table.columns[1:]:
if isinstance(column, tuple):
for c in column:
for x, factor in converter.items():
if x in c:
table[column] = table[column].apply(
lambda x, factor=factor: apply_factor(x, factor)
)
break
else:
for x, factor in converter.items():
parts = str(column).split(" ")
for y in parts:
if re.match(x, y):
table[column] = table[column].apply(
lambda x, factor=factor: apply_factor(x, factor)
)
table = table.rename({column: parts[0]}, axis=1)
break
table = table.dropna(axis=0, how="all")
table = table.dropna(axis=1, how="all")
columns_to_prune = []
for column_index, column_type in enumerate(table.dtypes[1:]):
if column_type in ["object", "str"]:
columns_to_prune.append(column_index + 1)
table = table.drop(table.columns[columns_to_prune], axis="columns")
table = table.replace(to_replace="None", value=np.nan)
table = table.dropna()
if len(table.columns) <= 1:
continue
exps = [r"^[0-9a-zA-Z]+[\.\)] ", r"[\+\=\-\_]"]
for _index, row in table.iterrows():
name_cleansed = row.iloc[0]
if not isinstance(name_cleansed, str):
continue
for exp in exps:
# print(row[0])
name_cleansed = re.sub(exp, "", name_cleansed).strip()
kpis[name_cleansed] = row.iloc[1]
return kpis
if __name__ == "__main__": if __name__ == "__main__":
ba_wrapper = Bundesanzeiger() ba_wrapper = Bundesanzeiger()