Data transfer script (#114)

Transfers data betwenn two sql instances.
Limited in data volume. Should be good enough for now.

---------

Co-authored-by: Tim <tim.ronneburg@outlook.de>
This commit is contained in:
2023-09-11 21:10:36 +02:00
committed by GitHub
parent d64f53eca9
commit ba44b082b8
4 changed files with 62 additions and 5 deletions

View File

@ -14,7 +14,7 @@ See the [CONTRIBUTING.md](CONTRIBUTING.md) about how code should be formatted an
## DB Connection settings
To connect to the SQL db see [sql/connector.py](./src/aki_prj23_transparenzregister/utils/postgres/connector.py)
To connect to the SQL db see [sql/connector.py](./src/aki_prj23_transparenzregister/utils/sql/connector.py)
To connect to the Mongo db see [connect]
Create a `secrets.json` in the root of this repo with the following structure (values to be replaces by desired config):
@ -38,8 +38,10 @@ Create a `secrets.json` in the root of this repo with the following structure (v
}
```
Alternatively, the secrets can be provided as environment variables. One option to do so is to add a `.env` file with the following layout:
```ini
Alternatively, the secrets can be provided as environment variables. One option to do so is to add a `.env` file with
the following layout:
```
PYTHON_POSTGRES_USERNAME=postgres
PYTHON_POSTGRES_PASSWORD=postgres
PYTHON_POSTGRES_HOST=localhost

View File

@ -1,11 +1,13 @@
"""Module containing connection utils for PostgreSQL DB."""
import re
import pandas as pd
import sqlalchemy as sa
from loguru import logger
from sqlalchemy.engine import URL, Engine
from sqlalchemy.orm import Session, declarative_base, sessionmaker
from sqlalchemy.pool import SingletonThreadPool
from tqdm import tqdm
from aki_prj23_transparenzregister.config.config_providers import (
ConfigProvider,
@ -83,11 +85,31 @@ def init_db(db: Session) -> None:
def reset_all_tables(db: Session) -> None:
"""Drops all SQL tables and recreates them."""
logger.info("Resetting all PostgreSQL tables.")
logger.info("Resetting all SQL tables.")
Base.metadata.drop_all(db.bind)
init_db(db)
@logger.catch(reraise=True)
def transfer_db(*, source: Session, destination: Session) -> None:
"""Transfers the data from on db to another db.
Args:
source: A session to a source db data should be copied from.
destination: A session to a db where the data should be copied to.
"""
reset_all_tables(destination)
init_db(destination)
sbind = source.bind
dbind = destination.bind
assert isinstance(sbind, Engine) # noqa: S101
assert isinstance(dbind, Engine) # noqa: S101
for table in tqdm(Base.metadata.sorted_tables):
pd.read_sql_table(str(table), sbind).to_sql(
str(table), dbind, if_exists="append", index=False
)
if __name__ == "__main__":
"""Main flow creating tables"""
init_db(get_session(JsonFileConfigProvider("./secrets.json")))

View File

@ -1 +0,0 @@
"""Tests for data_extraction."""

View File

@ -4,15 +4,19 @@ from collections.abc import Generator
from typing import Any
from unittest.mock import Mock, patch
import pandas as pd
import pytest
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from aki_prj23_transparenzregister.config.config_providers import JsonFileConfigProvider
from aki_prj23_transparenzregister.config.config_template import PostgreConnectionString
from aki_prj23_transparenzregister.utils.sql.connector import (
Base,
get_pg_engine,
get_session,
init_db,
transfer_db,
)
@ -27,6 +31,36 @@ def test_get_engine_pg() -> None:
assert get_pg_engine(conn_args) == result
@pytest.fixture()
def destination_db() -> Generator[Session, None, None]:
"""Generates a db Session to a sqlite db to copy data to."""
if os.path.exists("secondary.db"):
os.remove("secondary.db")
db = get_session("sqlite:///secondary.db")
init_db(db)
yield db
db.close()
bind = db.bind
assert isinstance(bind, Engine)
bind.dispose()
os.remove("secondary.db")
def test_transfer_db(full_db: Session, destination_db: Session) -> None:
"""Tests if the data transfer between two sql tables works."""
transfer_db(source=full_db, destination=destination_db)
sbind = full_db.bind
dbind = destination_db.bind
assert isinstance(sbind, Engine)
assert isinstance(dbind, Engine)
for table in Base.metadata.sorted_tables:
pd.testing.assert_frame_equal(
pd.read_sql_table(str(table), dbind),
pd.read_sql_table(str(table), sbind),
)
@pytest.fixture()
def delete_sqlite_table() -> Generator[str, None, None]:
"""Cleans a path before and deletes the table after a test.