Transfer financials from staging to sql (#129)

This commit is contained in:
Philipp Horstenkamp 2023-09-17 13:45:08 +02:00 committed by GitHub
parent fea31e543b
commit 56b6280264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 374 additions and 22 deletions

View File

@ -42,7 +42,7 @@ class JsonFileConfigProvider(ConfigProvider):
__data__: dict = {}
def __init__(self, file_path: str):
"""Constructor reading it's data from given .json file.
"""Constructor reading its data from a given .json file.
Args:
file_path (str): PATH to .json file containing config
@ -76,7 +76,7 @@ class JsonFileConfigProvider(ConfigProvider):
)
def get_mongo_connection_string(self) -> MongoConnection:
"""Read MongodB connection string from .json file added in constructor.
"""Read MongoDB connection string from .json file added in constructor.
Returns:
MongoConnection: Connection details
@ -122,7 +122,7 @@ class EnvironmentConfigProvider(ConfigProvider):
)
def get_mongo_connection_string(self) -> MongoConnection:
"""Read MongodB connection string from environment variables.
"""Read MongoDB connection string from environment variables.
Returns:
MongoConnection: Connection details

View File

@ -304,13 +304,15 @@ def add_relationships(companies: list[dict[str, dict]], db: Session) -> None:
companies: Companies to be added to the db.
db: A session to connect to an SQL db via SQLAlchemy.
"""
total: int = sum(len(company.get("relationships", [])) for company in companies)
total: int = sum(len(company.get("relationships", "")) for company in companies)
with tqdm(
total=total,
desc="Company connections added",
) as pbar:
for company in companies:
relationships: list[dict[str, Any]] = company.get("relationships", []) # type: ignore
relationships: list[dict[str, Any]] = company.get("relationships", "") # type: ignore
if not relationships:
continue
try:
company_id: int = get_company_id(
company["name"], # type: ignore
@ -333,20 +335,93 @@ def add_relationships(companies: list[dict[str, dict]], db: Session) -> None:
logger.info("Company connections added.")
def transfer_data(db: Session | None) -> None:
# yearly_results
def add_annual_report(company_id: int, year: int, report: dict, db: Session) -> None:
"""Ads a annual financial report to the SQL database.
The added report is linked with the company.
# TODO add a link to the accountant.
Args:
company_id: The SQL id of the company.
year: The year of the result.
report: The result that was
db: A session to connect to an SQL db via SQLAlchemy.
"""
if not report.get("auditors") and not report.get("financials"):
company = db.query(entities.Company).get(company_id)
if company is None:
raise KeyError(f"The company with the id {company_id} could not be found.")
logger.debug(f"No financial data found for {company.name} in the year {year}.")
return
db.add(
entities.AnnualFinanceStatement(
company_id=company_id,
date=date(year, 1, 1),
**report.get("financials", {}), # TODO can we have a date?
),
)
for auditor in report.get("auditors", ""):
pass
_ = auditor
# person_id = get_person_id(person.get("name")) # how to create a person relation?
# company relation?
def add_annual_financial_reports(companies: list[dict], db: Session) -> None:
"""Adds all the yearly results to the sql db.
Args:
companies: The companies datadump from the MongoDB.
db: A session to connect to an SQL db via SQLAlchemy.
"""
total: int = sum(len(company.get("yearly_results", "")) for company in companies)
with tqdm(
total=total,
desc="Company connections added",
) as pbar:
for company in companies:
yearly_results: dict[str, dict] = company.get("yearly_results", {})
if not yearly_results:
continue
try:
company_id: int = get_company_id(
company["name"],
company["location"]["zip_code"],
company["location"]["city"],
db=db,
)
except Exception:
logger.exception("The company could not be identified.")
pbar.update(len(yearly_results))
db.rollback()
raise
for year, report in yearly_results.items():
if not report:
continue
try:
year_int = int(year)
except ValueError:
logger.warning(
f"The company {company['name']} has a yearly result with an invalid year of \"{year}\".",
)
continue
add_annual_report(company_id, year_int, report, db=db)
pbar.update()
db.commit()
logger.info("Company connections added.")
def transfer_data(db: Session | None = None) -> None:
"""This functions transfers all the data from a production environment to a staging environment."""
if db is None:
db = get_session(JsonFileConfigProvider("./secrets.json"))
logger.remove()
logger.add(
sys.stdout,
level="INFO",
catch=True,
format="{time:YYYY-MM-DD HH:mm:ss} {level} {message}",
)
logger.add("data-transfer.log", level="INFO", retention=5)
reset_all_tables(db)
mongo_connector = MongoConnector(
JsonFileConfigProvider("./secrets.json").get_mongo_connection_string()
)
@ -354,8 +429,14 @@ def transfer_data(db: Session | None) -> None:
companies: list[dict[str, Any]] = mongo_company.get_all() # type: ignore
del mongo_company
if db is None:
db = get_session(JsonFileConfigProvider("./secrets.json"))
reset_all_tables(db)
add_companies(companies, db)
add_relationships(companies, db)
add_annual_financial_reports(companies, db)
db.close()

View File

@ -69,16 +69,25 @@ class AnnualFinanceStatement(Base):
id = sa.Column(sa.Integer, primary_key=True)
company_id = sa.Column(sa.Integer, sa.ForeignKey("company.id"))
date = sa.Column(sa.DateTime(timezone=True), nullable=False)
total_volume = sa.Column(sa.Float)
ebit = sa.Column(sa.Float)
ebitda = sa.Column(sa.Float)
ebit_margin = sa.Column(sa.Float)
total_balance = sa.Column(sa.Float)
equity = sa.Column(sa.Float)
debt = sa.Column(sa.Float)
return_on_equity = sa.Column(sa.Float)
capital_turnover_rate = sa.Column(sa.Float)
date = sa.Column(sa.Date, nullable=False)
total_volume = sa.Column(sa.Float, default="NaN")
ebit = sa.Column(sa.Float, default="NaN")
ebitda = sa.Column(sa.Float, default="NaN")
ebit_margin = sa.Column(sa.Float, default="NaN")
total_balance = sa.Column(sa.Float, default="NaN")
equity = sa.Column(sa.Float, default="NaN")
debt = sa.Column(sa.Float, default="NaN")
return_on_equity = sa.Column(sa.Float, default="NaN")
capital_turnover_rate = sa.Column(sa.Float, default="NaN")
current_liabilities = sa.Column(sa.Float, default="NaN")
dividends = sa.Column(sa.Float, default="NaN")
net_income = sa.Column(sa.Float, default="NaN")
assets = sa.Column(sa.Float, default="NaN")
long_term_debt = sa.Column(sa.Float, default="NaN")
short_term_debt = sa.Column(sa.Float, default="NaN")
revenue = sa.Column(sa.Float, default="NaN")
cash_flow = sa.Column(sa.Float, default="NaN")
current_assets = sa.Column(sa.Float, default="NaN") # assets vs current assets
# company: Mapped[Company] = relationship(Company)

View File

@ -13,6 +13,7 @@ from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from aki_prj23_transparenzregister.utils import data_transfer
from aki_prj23_transparenzregister.utils.sql import entities
@pytest.mark.parametrize(
@ -326,7 +327,7 @@ def test_add_company_broken_date(
@pytest.mark.parametrize("seed", list(range(5)))
@pytest.mark.parametrize("overwrite", ["", None, " "])
def test_add_company_broken_district_court(
seed: int, overwrite: str | None, full_db: Session, mocker: MockerFixture
seed: int, overwrite: str | None, full_db: Session
) -> None:
"""Test a broken district court entry."""
company = company_generator(seed)
@ -595,7 +596,6 @@ def test_add_relationships_none(empty_relations: list, full_db: Session) -> None
data_transfer.add_relationships([], full_db)
@pytest.mark.working_on()
@pytest.mark.parametrize(
"documents",
[
@ -742,3 +742,265 @@ def test_relationships(documents: list[dict[str, Any]], full_db: Session) -> Non
}
),
)
@pytest.mark.parametrize(
"companies",
[
[],
[{}],
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {2023: {}, "2023": {}},
},
],
],
)
def test_add_annual_financial_reports_no_call(
companies: list[dict], full_db: Session, mocker: MockerFixture
) -> None:
"""Testing if financial reports are added correctly to the db."""
spy_warning = mocker.spy(data_transfer.logger, "warning")
info_warning = mocker.spy(data_transfer.logger, "info")
mocker.patch("aki_prj23_transparenzregister.utils.data_transfer.add_annual_report")
data_transfer.add_annual_financial_reports(companies, full_db)
input_args = mocker.call.args
input_kwargs = mocker.call.kwargs
assert len(input_args) == len(input_kwargs)
spy_warning.assert_not_called()
info_warning.assert_called_once()
@pytest.mark.parametrize(
"companies",
[
[
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {"i am not an int": {"auditor": {}}},
}
],
],
)
def test_add_annual_financial_reports_defect_year(
companies: list[dict], full_db: Session, mocker: MockerFixture
) -> None:
"""Testing if financial reports are added correctly to the db."""
spy_warning = mocker.spy(data_transfer.logger, "warning")
info_warning = mocker.spy(data_transfer.logger, "info")
mocker.patch("aki_prj23_transparenzregister.utils.data_transfer.add_annual_report")
data_transfer.add_annual_financial_reports(companies, full_db)
input_args = mocker.call.args
input_kwargs = mocker.call.kwargs
assert len(input_args) == len(input_kwargs)
spy_warning.assert_called_once()
info_warning.assert_called_once()
def test_add_annual_financial_reports(full_db: Session, mocker: MockerFixture) -> None:
"""Testing if financial reports are added correctly to the db."""
companies = [
{
"_id": {"$oid": "649f16a2ecc"},
"id": {
"hr_number": "HRB 123",
"district_court": {
"name": "Amtsgericht Dortmund",
"city": "Dortmund",
},
},
"location": {
"city": "TV City",
"zip_code": "12345",
"street": "Sesamstr.",
"house_number": "1",
},
"name": "Some Company GmbH",
"last_update": "2023-05-04",
"relationships": [],
"yearly_results": {
2023: {"some-text1": {}},
"cast-me-to-int": {"some-text4": {}},
2025: {"some-text2": {}},
"cast-me-too": {"some-text5": {}},
2024: {"some-text3": {}},
},
}
]
spy_warning = mocker.spy(data_transfer.logger, "warning")
info_warning = mocker.spy(data_transfer.logger, "info")
mocked = mocker.patch(
"aki_prj23_transparenzregister.utils.data_transfer.add_annual_report"
)
data_transfer.add_annual_financial_reports(companies, full_db)
spy_warning.assert_has_calls([])
for input_args in mocked.call_args_list:
assert input_args.args[0] == 1
assert isinstance(input_args.kwargs["db"], Session)
assert len(input_args.kwargs) == 1
for year, input_args in zip([2023, 2025, 2024], mocked.call_args_list, strict=True):
assert year == input_args.args[1]
report: dict
for report, input_args in zip(
[{"some-text1": {}}, {"some-text2": {}}, {"some-text3": {}}],
mocked.call_args_list,
strict=True,
):
assert report == input_args.args[2]
for input_args in mocked.call_args_list:
assert isinstance(input_args.kwargs["db"], Session)
info_warning.assert_called_once()
@pytest.mark.parametrize("year", list(range(2000, 2025, 5)))
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize(
"empty_report",
[{}, {"auditors": []}, {"financials": []}, {"auditors": [], "financials": []}],
)
def test_add_annual_report_empty(
year: int,
company_id: int,
empty_report: dict,
full_db: Session,
mocker: MockerFixture,
) -> None:
"""Testing if the correct warning is thrown when the financial and auditor records are empty."""
df_prior = pd.read_sql_table(
entities.AnnualFinanceStatement.__tablename__, full_db.bind # type: ignore
)
spy_warning = mocker.spy(data_transfer.logger, "debug")
data_transfer.add_annual_report(company_id, year, empty_report, full_db)
full_db.commit()
spy_warning.assert_called_once()
pd.testing.assert_frame_equal(
df_prior,
pd.read_sql_table(entities.AnnualFinanceStatement.__tablename__, full_db.bind), # type: ignore
)
@pytest.mark.parametrize("year", [2015, 2023, 2024])
@pytest.mark.parametrize("company_id", [7, 8, 9])
@pytest.mark.parametrize(
"empty_report",
[{}, {"auditors": []}, {"financials": []}, {"auditors": [], "financials": []}],
)
def test_add_annual_report_to_unknown_company(
year: int, company_id: int, empty_report: dict, full_db: Session
) -> None:
"""Tests if an error is thrown when the company id isn't registered in the db."""
with pytest.raises(
KeyError, match="The company with the id .* could not be found."
):
data_transfer.add_annual_report(company_id, year, empty_report, full_db)
@pytest.mark.parametrize("company_id", [1, 2, 3])
@pytest.mark.parametrize("year", [2023, 2025, 2020])
@pytest.mark.parametrize("short_term_debt", [2023.2, 2025.5, 2020.5, float("NaN")])
def test_add_annual_report(
short_term_debt: float, company_id: int, year: int, full_db: Session
) -> None:
"""Tests the addition of annual financial records."""
data_transfer.add_annual_report(
company_id,
year,
{
"financials": {
"ebit": 123,
"ebitda": 235,
"short_term_debt": short_term_debt,
},
"auditors": {},
},
db=full_db,
)
full_db.commit()
df_prior = pd.read_sql_table(
entities.AnnualFinanceStatement.__tablename__, full_db.bind # type: ignore
)
pd.testing.assert_frame_equal(
pd.DataFrame(
[
{
"id": 1,
"company_id": company_id,
"date": pd.to_datetime(date(year, 1, 1)),
"total_volume": float("NaN"),
"ebit": 123.0,
"ebitda": 235.0,
"ebit_margin": float("NaN"),
"total_balance": float("NaN"),
"equity": float("NaN"),
"debt": float("NaN"),
"return_on_equity": float("NaN"),
"capital_turnover_rate": float("NaN"),
"current_liabilities": float("NaN"),
"dividends": float("NaN"),
"net_income": float("NaN"),
"assets": float("NaN"),
"long_term_debt": float("NaN"),
"short_term_debt": short_term_debt,
"revenue": float("NaN"),
"cash_flow": float("NaN"),
"current_assets": float("NaN"),
}
]
),
df_prior,
)
def test_add_annual_report_financial_key_error(full_db: Session) -> None:
"""Tests if an error is thrown financial data is tried to be added with an unknown financial record type."""
with pytest.raises(
TypeError, match="is an invalid keyword argument for AnnualFinanceStatement"
):
data_transfer.add_annual_report(
2,
2023,
{"financials": {"something-strange": 123.12}, "auditors": {}},
db=full_db,
)