docs: Abschlussarbeit on data ingestion (#500)

Co-authored-by: Philipp Horstenkamp <philipp@horstenkamp.de>
This commit is contained in:
Tristan Nolde 2024-01-02 15:50:58 +01:00 committed by GitHub
parent a29fa74a40
commit 98a189b94c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1005 additions and 0 deletions

View File

@ -0,0 +1,53 @@
### 4.2 Lösungsarchitektur
Im Verlauf der Entwicklung stellte sich heraus, dass eine zentrale Stelle zur Datenspeicherung benötigt wird, damit Entwickler Zugang zu den Daten sowohl im Staging-Bereich als auch in der Produktiv-Form haben. Die bereits in der ersten Phase des Projektes angelegte MongoDB im Atlas Hosting (internet-facing) wurde daher durch eine auf der FH-Infrastruktur gehostete Konstruktion ersetzt, die aus einer MongoDB und einer PostgreSQL-Datenbank besteht:
<img src="images/fh_swf_k8s.PNG" alt="Überblick über die Netzwerkarchitektur des FH K8s Clusters" width=600 height=350 />
Da es sich bei dem zur Verfügung gestellten System um einen Kubernetes (K8s) Host handelt, wurden entsprechende Manifeste angelegt und auf dem Server bereitgestellt.
Aufgrund der in der FH eingesetzten Netzwerkarchitektur war es jedoch nur möglich, den Server von außen über eine VPN-Verbindung (ForcePoint) zu erreichen. Dies stellte zwar keine große Herausforderung für die Projektteilnehmer dar, blockierte jedoch die Anbindung des selbst gehosteten Apache Airflow Servers. Daher wurden die regelmäßig bezogenen News-Artikel weiterhin in die im Internet stehende MongoDB gespeist und etwa einmal im Monat manuell in die FH-Infrastruktur überführt.
Nach dem Aufsetzen des Development Environments und den ersten Entwicklungsfortschritten galt es, eine Zielarchitektur zu bestimmen. Aufgrund der Vorteile in Portabilität und Unabhängigkeit von den Gegebenheiten verschiedener Clients wurde sich für eine Docker-basierte Lösung entschieden. Die einzelnen Docker Container, respektive Images, wurden anhand ihrer Anwendungsdomäne sowie des geplanten Datenflusses geschnitten:
<img src="images/docker_container_split.PNG" height=250/>
Die Aufgaben der Container wurden wie folgt verteilt:
1. **Data Ingestion:**
- Beziehen von Unternehmensdaten sowie News-Artikeln
- Aufbereitung dieser Daten
- Überführung in die Staging DB zur weiteren Verarbeitung
2. **Staging DB:**
- Persistierung der "Roh-Daten"
- Fachliche Trennung von Darstellung und Analyse
3. **Data Loader:**
- Transformation der "Roh-Daten" in das für die Visualisierung benötigte Ziel-Format
- Durchführung weiterer Analyse-Bausteine (unter anderem Sentiment)
- Überführung in die ProdDB
4. **ProdDB:**
- Persistierung der Daten im Schema für die Visualisierung
5. **Dashboard Server:**
- Webserver zur Bereitstellung der UI
- Dazugehörige Geschäftslogik (z.B. Aufbereitung der Graphen)
Diese Aufteilung entstand unter anderem nach der Identifikation der verschiedenen Aufgaben-Domänen sowie ihrer Features:
<img src="images/domains.PNG" />
Im Detail:
<img src="images/domains_details.PNG" />
Auch für dieses Konstrukt galt es, eine Ausführungsumgebung zu bestimmen. Da sich der Umgang mit K8s für die Gruppe aufgrund mangelnder Erfahrung als nicht durchführbar erwies, wurde stattdessen auf den hinter [jupiter.fh-swf](https://jupiter.fh-swf.de) liegenden Server zurückgegriffen. Bei diesem handelt es sich um einen Docker Swarm Host, auf dem bereits unterstützende Tools wie NGINX oder Portainer installiert sind.
<img src="images/portainer.PNG" height=450/>
Für das Deployment selbst wurde zunächst neben dem automatisierten Bau der verwendeten Docker Images eine [`docker-compose.yml`](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/docker-compose.yml) entwickelt. Diese Datei erfasst alle benötigten Komponenten, verknüpft sie über entsprechende Konfigurationen des Docker-Netzwerks und Umgebungsvariablen miteinander und steuert schließlich die Verteilung auf dem Swarm Cluster. Um eine automatisierte Bereitstellung der Services sowie Aktualisierung dieser zu ermöglichen, wurde mit Portainer ein neues Deployment namens `transparenzregister` erstellt und mit dem Code-Repository verknüpft. Auf diese Weise überprüft Portainer in regelmäßigen Intervallen, ob relevante Änderungen im Repository vorliegen sei es eine direkte Modifikation an der `docker-compose.yml` oder die Bereitstellung eines neueren Docker Images. Bei einer solchen Erkennung startet Portainer automatisch das Deployment erneut. Zudem wurde die NGINX-Konfiguration des Jupiter Servers erweitert, und der Webserver des Transparenzregisters so unter [https://jupiter.fh-swf.de/transparenzregister](https://jupiter.fh-swf.de/transparenzregister) dem Internet zur Verfügung gestellt. Die damit verbundenen Anpassungen an der Konfiguration des Deployments, wie etwa die Einführung des `BASE_PATHs` */transparenzregister* und die Integration von Basic Auth, um den Service vor Zugriff Externer zu schützen, wurden ebenfalls über die `docker-compose.yml` ermöglicht.
Auch wenn das Deployment anfangs wie gewünscht ablief, traten im Laufe der Zeit einige Probleme auf. Die hohe Frequenz an Änderungen am `main`-Branch, der vom automatisierten Deployment überwacht wird, durch die Integration von Dependabot sorgte dafür, dass schnell aufeinander folgende Deployments auf dem Server gestartet wurden und somit eine hohe Downtime der Services entstand. Ebenfalls hatte dies den Seiteneffekt, dass Unmengen an Docker Images erstellt und auf den Server transportiert wurden. Dies hatte in Kombination mit der ohnehin hohen Anzahl an teilweise veralteten Images zur Folge, dass der Speicherplatz auf dem Server an seine Grenzen geriet. Dieser Herausforderung hätte durch eine feinere Steuerung des Deployments begegnet werden können. Unter anderem hätten Änderungen erst auf einem `develop`- oder `release`-Branch gesammelt und erst bei Vollendung mehrerer Features oder relevanter Änderungen (z.B. Aktualisierung eingesetzter Bibliotheken) auf den `main`-Branch überführt und somit in das Deployment übergeben werden können. Eine erweiterte Versionierungsstrategie der Images (z.B. nach Semantic Versioning) hätte ebenfalls unterstützen können. Alternativ hätten Vorkehrungen getroffen werden müssen, um die auf dem Jupiter Server abgelegten Docker Images regelmäßig aufzuräumen.
Für die lokale Entwicklung sowie das Testen wurde zusätzlich eine [`local-docker-compose.yml`](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/local-docker-compose.yml) angelegt. Diese Datei beinhaltet den grundlegenden Aufbau des Produktiv-Deployments, wurde jedoch so angepasst, dass auf ein lokales Build der Docker Images aufgebaut wird. Zudem wurde ein zusätzlicher Container eingeführt, der die Staging DB zu Beginn mit einem Snapshot der eingesetzten Daten füllt, um somit den gesamten Datenfluss abspielen zu können. Näheres zum Aufsetzen wird in der [README](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/README.md) beschrieben. Um den kompletten Build Process neu zu starten, wurde zusätzlich die [rebuild-and-start.bat](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/rebuild-and-start.bat) angelegt, welche die dort beschriebenen Schritte in einem Skript kapselt.

View File

@ -0,0 +1,183 @@
#### Data Extraction
Um die gewünschten Analysen und Visualisierungen zu ermöglichen, wurden verschiedene Implementierungen geschaffen. Diese übernehmen einerseits den Bezug von Rohdaten aus verschiedenen Quellen aber auch die ersten Aufbereitungsschritte und letztlich Persistierung in der Staging DB.
##### Extraktion von News
Vor der Implementierung der News-Extraktion wurde zunächst analysiert, welche Informationen bzw. Attribute eines Artikels für die Applikation interessant sein könnten. Dabei wurden folgende Felder festgelegt:
- ID
Identifizierendes Merkmal für einen einzelnen Artikel, um Referenzen herstellen zu können bzw. Duplikate zu vermeiden von der Quelle festzulegen.
- Title
Titel des Artikels im Klartext.
- Date
Veröffentlichungsdatum des Artikels im ISO 8601 Format.
- Text
Inhalt des Artikels im Klartext.
- Source URL
URL des Artikels, um Nutzer darauf verweisen zu können.
Nachdem das Zielformat der Newsartikel festgelegt wurde, bestand die Aufgabe, diese Informationen aus den ausgewählten Quellen zu extrahieren. Um eine einheitliche und leicht erweiterbare Lösung zu schaffen, wurde zunächst die folgende Software-Architektur geplant:
<img src="images/news_class_diagram.PNG" height=400/>
Über die Methode get_news_for_category wird ein Interface geschaffen, über welches die darüberliegende Applikation News aus der in den Kind-Klassen implementierten Quelle beziehen kann. So soll es möglich sein, in der Applikation keine Spezifika der Datenquellen implementieren zu müssen, sondern lediglich auf die bereits kapselnde Klasse zurückzugreifen.
Über den `BaseNewsExtractor` wird ferner eine geschützte Methode veranlasst, die es den Kind-Klassen ermöglicht, die von der jeweiligen Quelle bezogenen Daten zu parsen, aufzubereiten und letztlich in eine Liste von News-Objekten zu überführen. Mittels dieser Struktur könnten in Zukunft problemlos weitere Quellen angebunden werden, ohne große Änderungen am Verlauf der Haupt-Applikation vornehmen zu müssen. Es müsste lediglich die neue Klasse importiert und ein Objekt davon erstellt werden (Stichwort: "Programming To an Interface").
In der Applikation `fetch_news` wird darauf aufbauend die Überführung der von den `BaseNewsExtractor` erbenden Klassen bezogenen News in die Staging DB behandelt.
![](images/fetch_news.PNG)
##### Stammdaten-Extraktion
Die Extraktion von Unternehmens-Stammdaten wurde zunächst gemäß des ETL-Lebenszykluses in drei Sub-Domänen unterteilt:
1. [Extract](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/extract.py)
Download der Rohdaten aus dem Unternehmensregister
2. [Transform](https://github.com/fhswf/aki_prj23_transparenzregister/tree/main/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/transform)
Aufbereitung und Bereinigung der Rohdaten sowie Überführung in das Zielformat
3. [Load](https://github.com/fhswf/aki_prj23_transparenzregister/blob/main/src/aki_prj23_transparenzregister/utils/data_extraction/unternehmensregister/load.py)
Einfügen bzw. Aktualisieren der Einträge in der Staging DB
<img src="images/etl_unternehmensregister.PNG" />
Da das Unternehmensregister weder über eine API verfügt, noch bereits Python-Bibliotheken existieren, die den Umgang mit dem Unternehmensregister erleichtern, musste eine eigene Lösung von Grund auf entwickelt werden. Aufgrund der Beschaffenheit des Unternehmensregisters bestand die einzige Lösung in der Automatisierung eines Bots, der durch Klicks die Website des Unternehmensregisters steuert und sich so zum Download der gewünschten Informationen, dem sog. "Strukturierten Inhalt" den Stammdaten eines Unternehmens in XML , arbeitet.
Das Ziel dieses Schrittes war es, eben jene Dateien in einen dafür vorgesehenen Ordner auf dem ausführenden Client zu überführen, auf den dann im Folgeschritt zugegriffen werden kann.
![](images/unternehmensregister_scrape.PNG)
Da die Laufzeit dieses Prozesses aufgrund der zwischengelagerten Warteintervalle auf der Website recht hoch war, wurde zunächst Abstand von einem kompletten Abbild des Unternehmensregisters genommen. Stattdessen wurde eine Verarbeitung der 100 größten deutschen Unternehmen laut Statista gestartet. Zusätzlich wurde die Verarbeitung so strukturiert und gekapselt, dass durch Multi-Processing mehrere Unternehmen parallel verarbeitet werden konnten. Hierzu wird für jede Suche eine eigene Instanz eines Headless Chrome Browsers und Selenium gestartet, so dass sich die Prozesse nicht gegenseitig in die Quere kommen können.
In der darauffolgenden Transformation wird jedes XML Dokument eingelesen und in das Zielformat der `Company` überführt:
<img src="images/data_model_company.PNG" />
Details können dem Quell-Code entnommen werden, im Groben werden Informationen wie folgt entnommen:
- ID
- HR Nummer: Aktenzeichen des Unternehmens
- Amtsgericht: 2. "Beteiligung" Eintrag
- Name: 1. "Beteiligung" Eintrag
- Location: 1. "Beteiligung" Eintrag
- Last Update: "Letzte Eintragung"
- Company Type: "Rechtsform" Eintag; alternativ: Suffix des Unternehmensnamens
- Capital: "Zusatzangaben" Eintrag
- Business Purpose: "Gegenstand oder Geschäftszweck" Eintrag
- Stakeholder: "Beteiligung" Eintrag >= 2
- Fouding Date: "Tag der ersten Eintragung" oder "Gesellschaftsvertrag vom" oder "Gruendungsdatum" Eintrag
Da viele der oben genannten Angaben nicht immer zur Verfügung stehen, wurden zahlreiche der Attribute optional bzw. als "nullable" gesetzt. Insgesamt wurden viele Probleme in den Daten identifiziert. Unter anderem sind Personen nicht immer eindeutig referenziert, sondern werden lediglich über ihren Namen und gegebenenfalls Geburtsdatum identifiziert. Dies stellt jedoch nur eine indirekte und möglicherweise unzuverlässige Methode zur Identifizierung dar.
Auch bei Adressen traten wiederholt Probleme auf, da Straßennamen in unterschiedlichen Einträgen unterschiedlich geschrieben wurden oder Hausnummer und Straße nicht sauber getrennt wurden. In der Implementierung sind daher zahlreiche vorgelagerte Überprüfungen und weitere Aufbereitungen zu finden, um die Daten sauber übergeben zu können.
Nach einiger Zeit jedoch traten plötzliche Fehler in den Routinen zur Extraktion von Stammdaten auf. Eine Analyse der Logs sowie der bezogenen und transformierten Daten zeigte, dass das Unternehmensregister nicht, wie ursprünglich angenommen, ein einheitliches Format pflegt. Stattdessen gibt es den sogenannten "Strukturierten Inhalt" zwar immer im XML-Format aus, jedoch kann das darunterliegende Schema je nach Alter des Eintrags unterschiedlich sein. Nähere Details zum Problem sind im Issue [Issue #233](https://github.com/fhswf/aki_prj23_transparenzregister/issues/233) zu finden.
xJustizVersion v1
```json
{
"XJustiz_Daten": {
"@xmlns": "http://www.xjustiz.de",
"@xmlns:xdo": "http://www.xdomea.de",
"@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
"@xsi:schemaLocation": "http://www.xjustiz.de xj_0400_register_1_9.xsd",
"Grunddaten": {
"@XJustizVersion": "1.20.0",
...
}
```
xJustizVersion v3
```json
{
"tns:nachricht.reg.0400003": {
"@xmlns:tns": "http://www.xjustiz.de",
"@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
"@xsi:schemaLocation": "http://www.xjustiz.de xjustiz_0400_register_3_2.xsd",
"tns:nachrichtenkopf": {
"@xjustizVersion": "3.3.1",
"tns:aktenzeichen.absender": null,
...
```
Nicht nur unterscheiden sich die Schlüssel zwischen den Versionen, sondern auch die Wertebereiche und der Einsatz von Enums zeigen signifikante Unterschiede.
Die Aufbereitungsroutine wurde daher überarbeitet. Ähnlich der Lösung mit dem `BaseNewsExtractor` aus der News-Verarbeitung wurde hier ebenfalls eine Oberklasse (`BaseTransformer`) eingeführt. Diese wird von den Implementierungen `v1` oder `v3` abgeleitet, die jeweils das gleichnamige Datenschema kapseln. Im BaseTransformer werden verschiedene Submethoden definiert, die jeweils einen Aufbereitungsschritt repräsentieren. Diese müssen von der entsprechenden Kindklasse gemäß des Datenschemas implementiert werden.
Die Methode `map_unternehmensregister_json` dient dabei als zentraler Einstiegspunkt für die darüberliegende Applikation. Generelle Funktionen wie zum Beispiel das Parsen von Datumsangaben oder die Normalisierung von Strings werden zentral zur Verfügung gestellt und können von allen Implementierungen verwendet werden.
<img src="images/transformer_class_diagram.PNG" height=400/>
Die `main`-Funktion des übergeordneten `transform`-Moduls bestimmt, welche Implementierung verwendet werden soll, indem sie den Header der Daten (Beispiele siehe oben) ausliest. Anschließend übergibt sie die Daten entweder an die `v1`- oder `v3`-Implementierung und speichert die Ergebnisse als JSON-Dateien in einem separaten Ordner.
Dieser Ordner wird schließlich von der Lade-Routine aufgegriffen, und jedes Dokument wird in die Staging DB überführt. Dabei werden neue Einträge anhand der ID ermittelt komplett neu angelegt, und Felder bestehender Artikel werden aktualisiert. Es findet kein vollständiger Austausch statt, der bereits ermittelte Finanzergebnisse überschreiben würde.
##### Extraktion von Finanzdaten
Finanzdaten und Kennzahlen der Unternehmen werden der Öffentlichkeit zentral über den [Bundesanzeiger](https://www.bundesanzeiger.de/) in Form von Jahresabschlüssen sowie der Veröffentlichung relevanter Bekanntmachungen, wie Änderungen im Aufsichtsrat oder der Terminierung des Unternehmens, zur Verfügung gestellt. Leider wird keine einfache API bereitgestellt, die es ermöglichen würde, gezielt gewünschte KPIs zu einem ausgewählten Unternehmen zu beziehen. Stattdessen existiert lediglich eine Python-Bibliothek namens [deutschland](https://github.com/bundesAPI/deutschland), in der eine Reihe von behördlichen Portalen gekapselt und bestimmte Interaktionen ermöglicht werden.
In dieser Bibliothek lässt sich bereits ein Paket finden, das für die Interaktion mit dem Bundesanzeiger zuständig ist. Dieses stellt wiederum eine Methode bereit, mit der eine Suche nach einem gegebenen Begriff durchgeführt und die Dokumente der **ersten** Seite ausgelesen werden können. Das resultierende Array enthält Datenobjekte mit Titel, Rohform des Inhalts (HTML) sowie dem reinen Text des Dokuments.
Die dabei gefundenen Elemente werden zunächst nach ihrem Typ gefiltert. Für den aktuellen Stand des Projekts sind lediglich die Jahresabschlüsse von Interesse. Nach dieser Filterung werden per Regex das Datum und das dazugehörige Geschäftsjahr extrahiert. Für jeden Eintrag werden dann die Wirtschaftsprüfer sowie die Finanzergebnisse extrahiert. Da die Angabe der Wirtschaftsprüfer über die bisher verarbeiteten Einträge konsistent ist, kann diese Information über die Struktur des Dokuments anhand der HTML-Struktur bezogen werden. Bei den Finanzdaten gestaltete sich dies etwas komplizierter:
Unter anderem ist das verwendete Format zwischen Unternehmen unterschiedlich, sowohl was die Strukturierung des Jahresabschlusses als auch die eingesetzten Fachwörter betrifft. So kann es sein, dass in einem Dokument der "Jahresumsatz in 1000€" im anderen als "Umsatz in EUR" ausgewiesen wird. In erster Instanz wurde eine Lösung entwickelt, die auf Basis von Regex in der Lage ist, eine Reihe von gewünschten Kennzahlen aus dem Text der Jahresabschlüsse zu extrahieren:
```python
kpi_patterns = {
FinancialKPIEnum.REVENUE.value: r"(?:revenue|umsatz|erlöse)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.NET_INCOME.value: r"(?:net income|jahresüberschuss|nettoeinkommen|Ergebnis nach Steuern)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBIT.value: r"(?:ebit|operating income)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.EBITDA.value: r"(?:ebitda)[:\s]*([\d,.]+[mmb]?)",
FinancialKPIEnum.GROSS_PROFIT.value: r"(?:gross profit|bruttogewinn)[:\s]*([\d,.]+[mmb]?)",
...
}
```
Diese Variante übersetzt eine Reihe möglicher Begriffe in die im Projekt verankerten Kennzahlen-Enum der Modelle. Da viele Jahresabschlüsse die gewünschten Informationen jedoch nicht direkt im Text ausweisen, sondern diese in Tabellen visualisieren, konnte diese Lösung lediglich mäßige Erfolge erzielen.
Daher wurde eine weitere Lösung entwickelt, die nicht auf Basis des Textes, sondern auf Basis von in den Jahresabschlüssen aufgeführten Tabellen agiert. In dieser Implementierung werden jeglich im Jahresabschluss gefundenen Tabellen aufbereitet, transformiert und letztlich in ein Dictionary überführt, das alle Zeilenbezeichnungen, die numerische Werte führen, als Schlüssel und den dazugehörigen Wert bereinigt auf eine einheitliche Einheit, den Euro führt. Bei dieser Lösung handelt es sich im Vergleich zur vorherigen um einen etwas freieren Ansatz, der zwar in der Lage ist, deutlich mehr Kennzahlen zu extrahieren, diese aber noch nicht auf das spätere Ziel-Format zuschneidet.
Aufgrund der damit verbundenen Änderungen an den Folgekomponenten, wie dem `Data Processing`, wurden die bisher bezogenen Daten zunächst so belassen und noch nicht mit der neuen Lösung verarbeitet. In Zukunft könnte dies jedoch bessere Ergebnisse erzielen und vor allem durch eine Überführung in die Staging DB und die Trennung zur weiteren Verarbeitung eine Lösung schaffen, die bereits weitere Kennzahlen enthält und bei Bedarf in die weiteren Schichten führen kann..
##### Dockerization & Scheduling
Im Laufe der Entwicklung und mit der Integration des eigentlichen Deployments auf den Servern der FH SWF wurde deutlich, dass ein Scheduler für die Data Extraction Jobs über Apache Airflow den gesamten Deploymentsprozess und den damit verbundenen Organisationsaufwand erheblich erhöhen würde. Im Vergleich zur zuvor vorgestellten Airflow-Instanz, die auf einem Heimserver eines Projektmitglieds in einer Umgebung gehostet wurde, die keine weiteren Applikationen anderer Gruppen zur Verfügung stellt, wäre der Aufbau der Airflow-Instanz und vor allem die nachhaltige Konfiguration, einschließlich der Einrichtung eines entsprechenden User-Managements wie etwa die Anbindung an Keycloak, zu zeitaufwendig.
Da dennoch die Anforderung besteht, bestimmte Jobs der Datenextraktion regelmäßig zu starten, wurde ein dedizierter Docker Container mit einem integrierten Scheduler entwickelt.
Dieser Container, als `ingest` in den beiden `docker-compose`-Dateien des Projekts zu finden, greift auf bestehende Applikationen/Funktionen im Bereich der Data Extraction zu und kapselt diese im Scheduler. Der Scheduler stellt eine Erweiterung zum bereits existierenden [`schedule`](https://schedule.readthedocs.io/en/stable/index.html)-Paket dar und bietet zusätzlich die Funktion, die nächste Ausführung in einer Datei zu persistieren. Dadurch überlebt er beispielsweise das erneute Deployment der Applikation und startet mit dem gleichen Intervall ausgehend von der tatsächlich letzten Ausführung. Ohne diese Funktion würde das gesetzte Intervall nach jedem Deployment von Neuem starten. Dies hätte in der Hochphase der Entwicklung dazu geführt, dass das gewünschte Intervall nie erreicht werden kann, da in der Zwischenzeit immer wieder eine neue Version deployed wird.
![](images/scheduler.PNG)
Im Kern des `ingest` stehen nun zwei Ziele:
1. Das Beziehen aktueller News-Artikel aus den bereits vorgestellten Quellen (Tagesschau API, Handelsblatt RSS Feed)
2. Das Füllen von Lücken in der Staging DB in Bezug auf Unternehmen
Der erste Job wurde bereits ausführlich in einem vorherigen Kapitel erläutert. Beim zweiten Job handelt es sich um eine Routine aus verschiedenen Schritten. Neben dem Bezug von Finanzdaten für Unternehmen, für welche noch keine solchen Daten in der Staging DB vorliegen, liegt das Hauptaugenmerk auf der Stammdaten-Extraktion für Unternehmen. Diese Unternehmen werden während der Daten-Transformation von der Staging- in die ProdDB zwar referenziert (zum Beispiel über Wirtschaftsprüfer-Beziehungen oder als Prokuristen etc.), sind jedoch als eigenständige Einträge noch nicht vorhanden. Dieser Job greift auf die bestehenden Scraping-Funktionen zurück und nutzt diese, um eben jene Daten zu beziehen. Durch den erneuten Aufruf der Daten-Transformationsroutine kann die Datenbasis so natürlich wachsen und erweitert sich täglich automatisch, ohne dass manuelles Eingreifen nötig wäre.
![](images/scheduler_extended.PNG)
Auch wenn die Eigenimplementierung des Schedulers im Vergleich zu den Möglichkeiten von Apache Airflow einen bedeutenden Rückschritt darstellt, da unter anderem grafisch aufbereitete Logs und Fehleranalysen verloren gehen, konnte durch diese Entscheidung im Rahmen des Projektes welches lediglich den Charakter eines Proof of Concepts (PoC) genießt Zeit sowie Aufwand eingespart werden. Auch die daher gesparten Entwicklungsaufwände, um den bestehenden Code in das von Airflow gewünschte Format zu überführen, entfielen.
##### Zwischenfazit
Zusammenfassend lässt sich festhalten, dass die anfänglich gesetzten Ziele der Datenextraktion nur teilweise erfüllt werden konnten. Zwar sind die grundlegenden Informationen wie Stammdaten sowie die ersten Finanzergebnisse vorhanden, jedoch fehlen viele der initial festgelegten Kennzahlen, die sich auch mit dem aktuellen Datenbestand nur partiell in der "Data Processing"-Komponente berechnen lassen. Ebenfalls fehlt grundlegend eine Historisierung der Daten, sodass immer nur eine Momentaufnahme erstellt werden kann. Sollte das Projekt weiter fortgeführt werden, so muss hier angesetzt werden, um auch den Verlauf der am Unternehmen beteiligten Personen oder weitere relevante Änderungen nachverfolgen zu können.
Nichtsdestotrotz wurde im Rahmen der Entwicklung Code geschaffen, der sich dafür eignet, in das "deutschland"-Paket als Wrapper für das Unternehmensregister eingeführt zu werden. Dass der Bedarf für eine solche Funktion besteht, zeigt ein aktuelles Issue [#132](https://github.com/bundesAPI/deutschland/issues/132).
Ebenfalls anzumerken ist die technische Hürde, die aktuell von Behörden gestellt wird, um an die gewünschten Daten zu gelangen. Es wäre wünschenswert, in Zukunft eine einfacher zugängliche Lösung z.B. auf Basis öffentlicher APIs zu sehen.

Binary file not shown.

After

Width:  |  Height:  |  Size: 646 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 193 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 116 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 365 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 450 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 137 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

View File

@ -0,0 +1,765 @@
---
author:
- |
Tristan Nolde\
Fachhochschule Südwestfalen\
Schriftliche Ausarbeitung im Modul\
„Projektgruppe"\
im Studiengang\
M.Sc. Angewandte Künstliche Intelligenz\
eingereicht bei\
Prof. Dr.-Ing. Doga Arinir
bibliography: literatur.bib
title: Automatische Daten Extraktion aus Web Quellen
---
# Automatische Daten Extraktion aus Web Quellen
## Erklärung
Ich erkläre hiermit, dass ich die vorliegende Arbeit selbstständig
verfasst und dabei keine anderen als die angegebenen Hilfsmittel benutzt
habe. Sämtliche Stellen der Arbeit, die im Wortlaut oder dem Sinn nach
Werken anderer Autoren entnommen sind, habe ich als solche kenntlich
gemacht. Die Arbeit wurde bisher weder gesamt noch in Teilen einer
anderen Prüfungsbehörde vorgelegt und auch noch nicht veröffentlicht.
2023-10-03
<img src="Abbildungen/unterschrift.PNG" alt="image" />
Tristan Nolde
## Einleitung
Im Projekt *Transparenzregister* steht die Analyse sowie Visualisierung
von Unternehmensdaten im Vordergrund. Zu diesen Daten gehören unter
anderem *Key Performance Indicators (KPIs)* im
Bereich Finanzen (z.B. Verlauf des Umsatzes oder Jahresüberschusses)
aber auch grundlegende Stammdaten wie den Sitz des Unternehmens oder die
Besetzung des Aufsichtsrates. Um diese Informationen jedoch darstellen
zu können, bedarf es zunächst der Beschaffung und Aufbereitung von
Daten. In der im Projekt verankerten Architektur bedeutet dies, die
Extraktion von Rohdaten aus verschiedenen Quellen, die Stücke der
benötigten Informationen beinhalten, un den Transport dieser in einem
geeigneten Format in die sog. *Staging DB*. Aus dieser heraus bedienen sich die
verschiedenen Analyse-Prozesse und stellen letztendlich einen vollkommen
aufbereitenden Datenbestand in der *Production DB* zur Verfügung.
<img src="Abbildungen/HLD.PNG" height=250 />
Diese Hausarbeit befasst sich mit der Bereitstellung eben dieser
Informationen für die *Staging DB*. Neben der initialen Vermittlung von in
der Forschung und Industrie bewährten Methoden, soll primär die
praktische Umsetzung der Daten Extraktion für die Projektgruppe im
Vordergrund stehen. Dafür werden Verfahren vorgestellt, um die
gewünschten Informationen aus verschiedenen Quellen zu beziehen. Zum
Ende der Arbeit wird ebenfalls ein erster Blick auf die rechtlichen
Rahmenbedingungen dieser Daten-Extraktion sowie ihrer Speicherung
geworfen. Da die während der Arbeit entstehenden Applikationen auch
betrieben und gewartet werden müssen, wird ein Blick auf [Apache
Airflow](https://airflow.apache.org/) als Workflow Orchestrator
geworfen.
Als Quellen für die gewünschte Datenbasis dienen unter anderem die
Folgenden:
- Unternehmensregister
- Bundesanzeiger
- Tagesschau
- Handelsblatt
### Web Mining
Das Web Mining als Unterdisziplin des Data Minings - \"\[..\] einer
Sammlung von Methoden und Vorgehensweisen, mit denen große Datenmengen
effizient analysiert werden können\" [1] - beschäftigt sich
vorwiegend mit der Auswertung von Quellen aus dem Internet. Da das
Internet zu großen Teilen aus textuellen Informationen in Form von
Websites besteht, bildet das Web Mining Schnittmengen zum Text Mining.
<img src="Abbildungen/Web_Mining.PNG" height=400 />
Besonders im Text Mining wird Wert auf die Analyse von Dokumenten in
Form von HTML oder XML Dateien gelegt. Das Web Mining legt lediglich die
Schicht der Beschaffung jener Dokumente dazu. [1]
### Extract, Transform, Load (ETL)
Ein grundlegender Prozess, der in allen Data Mining Sub-Formen Anwendung
findet ist der des *ETL*. Dieser beschreibt unabhängig von der
ursprünglichen Datenquelle oder ihrem Ziel, den Wunsch, Daten von einem
System in einer aufbereiteten Form in das Zielsystem zu transportieren.
[1]
<img src="Abbildungen/ETL.PNG" height=200 />
Da die Ergebnisse dieses Prozesses die Grundlage für Folge-Schritte wie
eine weitere Analyse oder bereits Darstellung legt, ist hier ein
besonderes Augenmerk auf die Qualität der Rohdaten sowie Ergebnisse zu
legen. Ein bekanntes Sprichwort in diesem Kontext ist *\"Garbage in,
Garbage out\"*. Darunter wird der Zustand verstanden, dass niederwertige
Rohdaten selbst mit komplexer Aufbereitung nicht in qualitativen
Ergebnissen münden können. So ist im Schritte der Extraktion besonderes
Augenmerk auf die Auswahl der Datenquelle bzw. Dokumente zu legen.
[1]
Die Bedeutung von Qualität findet sich ebenfalls in der Transformation
wieder. Hier gilt es nicht nur Daten von einen in das andere Format zu
überführen (Harmonisierung), sondern auch ungeeignete Datensätze oder
Passagen zu erkennen und nicht weiter zu betrachten (Filterung).
[1]
Zuletzt gilt es die aufbereiteten und selektierten Daten in das
Zielsystem - hier die Staging Datenbank - zu überführen. Dieser Prozess
kann je nach zeitlicher Ausführung (initiales Laden, Aktualisierung)
unterschiedlich ablaufen, da entweder neue Daten hinzugefügt oder
Bestehende angepasst werden. [1]
## Web Scraping & Crawling
Eine technische Anwendung des Web Minings ist in Form des *Web
Scrapings* (auch: Web Crawling) zu finden. Zunächst beschreibt das
Scrapen die Extraktion von Daten aus Websites. Dies kann entweder
manuell oder automatisch geschehen. Sollte das zur Automatisierung
entwickelte Programm nicht nur auf einer Zielseite verweilen, sondern
über auf der Seite gefundene Links zu weiteren Seiten vordringen, so ist
vom *Crawling* die Rede. Dieses Vorgehen ist unter anderem bei
Suchmaschinen wie Google anzutreffen. Da diese Technik unter anderem
eingesetzt werden kann, um sensible Daten wie Telefonnummern zu finden,
wurden mit der Zeit einige Mechanismen wie die *robots.txt* Datei
entwickelt, in jener der Betreiber einer Website festlegt, welche
Zugriffe erlaubt sind. [2]
Allerdings hat dieser Mechanismus eine elementare Schachstelle: Die
*robots.txt* Einträge dienen lediglich als Richtlinie für Clients, so
dass Crawler in der Lage sind diese schlichtweg zu ignorieren und
dennoch relevante Daten von der Seite zu scrapen. [3]
### Bezug von Stammdaten aus dem Unternehmensregister
Im Unternehmensregister werden veröffentlichungspflichtige Daten
deutscher Unternehmen wie etwa die Firmengründung oder Liquidation in
elektronischer Art zur Verfügung gestellt. [@unternehmensregister]
Besonders relevant, um überhaupt ein Inventar an Unternehmen aufstellen
zu können, sind die dort zu findenden Registerinformationen. In diesen
werden Daten aus Handels-, Genossenschafts- und Partnerschaftsregistern
bereit gestellt. Das Ziel ist es aus diesen grundlegende Informationen
wie den Sitz des Unternehmens, seine Gesellschaftsform sowie ggf.
verfügbare Relationen zu Personen oder anderen Unternehmen zu beziehen.
Das dazugehörige Ziel-Datenmodell ist im Anhang zu finden.
Zu Beginn der Entwicklung wird zunächst einer erster manueller Download
Prozess von Daten gestartet. Die Website des Unternehmensregisters
stellt eine schlagwort-basierte Suchmaske bereit über die der Nutzer auf
eine Historie der gefundenen Unternehmen geleitet wird. Bereits hier
stellt sich heraus, dass die Suche des Unternehmensregisters keine 1:1
Abgleich des Unternehmensnamens sondern eine ähnlichkeits-basierte Suche
verwendet. So förderte die unten dargestellte Suche nach der \"Bayer
AG\" auch Einträge wie die Bayer Gastronomie GmbH zu Tage:
<img src="Abbildungen/suche_bayer.PNG" height=450 />
Auch eine Anpassung des Suchbegriffes durch die Verwendung regulärer
Ausdrücke (z.B. \^̈Bayer AG\$)̈ kann die Qualität der Ergebnisse nicht
verbessern. Da es sich bei diesen ähnlichen Unternehmen jedoch auch
tatsächlich um Tochtergesellschaften handeln könnte, die für die
Verflechtsungsanalyse besonders interessant sind, werden nicht passende
Unternehmensnamen nicht entfernt sondern ebenfalls verarbeitet.
Ist ein Unternehmen ausgewählt, stellt das Unternehmensregister
verschiedene Format zur Verfügung. Für eine automatische Auswertung der
Daten eignet sich besonders der *SI - strukturierter Registerinhalt*,
welcher ein XML-Dokument im Format der XJustiz beinhaltet. Es wird das
Schema *xjustiz_0400_register_3_2.xsd* verwendet.
Da das Unternehmensregister keine öffentliche *Application Programming Interface (API)* hostet, über die dieses Dokument bezogen
werden kann, erfolgt eine Automatisierung des über den Nutzer in der
Web-Oberfläche getätigten Aktionsfluss mithilfe von
[Selenium](https://www.selenium.dev/).
<img src="Abbildungen/Selenium.PNG" height=400 />
Eine Schritt-für-Schritt Beschreibung des Prozesses würde den Rahmen der
Hausarbeit sprengen, daher wird ein High-Level Überblick in Form eines
Ablaufdiagramms im Anhang bereit gestellt.
Im nachgelagerten Schritt werden die heruntergeladenen *.xml* Dateien in
eine *.json* konvertiert und in das Zielformat überführt. Darauf werden
die Daten über einen zentral gelegenen Service, der die Verbindung zur
StagingDB (MongoDB) in entsprechende Methoden kapselt, in die Persistenz
überführt.
### Bezug von Finanzdaten aus dem Bundesanzeiger
Jede Kapitalgesellschaft in Deutschland ist gemäß §242 des *Handelsgesetzbuches (HGB)* dazu
verpflichtet zu Gründung der Gesellschaft sowie nach jedem abgeschlossen
Geschäftsjahr eine Aufstellung der finanziellen Situation des
Unternehmens zu veröffentlichen. Diese besteht mindestens aus der
Gewinn- und Verlustrechnung, kann jedoch je nach Gesellschaftsform
weitere Details verpflichtend beinhalten. Im Bundesanzeiger, einem
Amtsblatt sowie Online Plattform geleitet vom Bundesministerium der
Justiz, werden diese Jahresabschlüsse der Allgemeinheit zur Verfügung
gestellt.
Aus Sicht des Transparenzregisters eignet sich der Bundesanzeiger daher
als besonders gute Quelle, um Finanzdaten zu bereits bekannten
Unternehmen (siehe vorheriges Kapitel) zu beziehen, um den
wirtschaftlichen Erfolg einer Kapitalgesellschaft quantifizieren zu
können.
Auf GitHub, einem zentralen Anlaufpunkt für Open-Source Projekte, kann
eine Python Bibliothek namens
[deutschland](https://github.com/bundesAPI/deutschland) gefunden, welche
eine Reihe deutscher Datenquellen kapselt und in einem festen Format zur
Verfügung stellt. Unter anderem beinhaltet diese Bibliothek auch ein
Sub-Paket namens *bundesanzeiger*, welches ein Interface zur Oberfläche
des Bundesanzeigers zur Verfügung stellt. Das Projekt wird unter der
[Apache License
2.0](https://github.com/bundesAPI/deutschland/blob/main/LICENSE)
betrieben und ist folglich auch für den Einsatz im Projekt geeignet.
Die erste Integration der Bibliothek zeigt jedoch schnell eine große
Schwäche auf: Im Zuge des Datenbezugs werden die Rohdaten (HTML) bereits
in einfachen Text übersetzt. Dies hat zur Folge, dass wertvolle
Informationen, die sich aus der Struktur des Dokumentes ergeben (z.B.
der Einsatz von Tabellen, um die Aktiva wie Passiva des
Jahresabschlusses aufzulisten). Es wird daher ein [Fork des
Repositories](https://github.com/TrisNol/deutschland/tree/feat/bundesanzeiger-raw-report)
angelegt und der Quellcode so adaptiert, dass auch der Inhalt in seiner
Ursprungsform bereitgestellt wird.
Der damit verbundene Aufwand, um sich in die Bibliothek einzuarbeiten,
gibt hilfreiche Einblicke in die genaue Funktionsweise der
Implementierung: Über eine Reihe gezielter HTTP-Anfragen gekoppelt mit
einer Auswertung über Web Scraping navigiert die Bibliothek durch die
Seiten des Bundesanzeigers. Wird für eine Seite (z.B. Jahresabschlüsse)
ein CAPTCHA - ein Authentifizierungs-Mechanismus im
Challenge-Response-Verfahren, der den Zugang durch Bots verhindern soll
[5] - abgefragt, so wird dieser mittels eines Machine Learning
Models des Chaos Computer Club e.V. gelöst.
Auf Basis dieser Grundlage lassen sich unter anderem Informationen wie
die Angabe des Wirtschaftsprüfers eines Jahresabschlusses extrahieren:
```python
def extract_auditor_company(self, report: str) -> str | None:
soup = BeautifulSoup(report, features="html.parser")
temp = soup.find_all("b")
for elem in temp:
br = elem.findChildren("br")
if len(br) > 0:
return elem.text.split("\n")[1].strip()
return None
def extract_auditors(self, report: str) -> list:
auditor_company = self.extract_auditor_company(report)
auditor_regex = r"[a-z A-Z,.'-]+, Wirtschaftsprüfer"
hits = re.findall(auditor_regex, report)
return [
Auditor(hit.replace(", Wirtschaftsprüfer", "").lstrip(), auditor_company)
for hit in hits
]
```
Diese Methoden werden auf eine anhand des Dokumententyps gefilterte
Liste der bezogenen *Reports* angewandt. Eine Erweiterung auf die
Extraktion von Finanzdaten steht aus.
## Extraktion von News Artikeln
Auch wenn das Ziel des ETL-Prozesses eine MongoDB - also eine NoSQL
Document Datenbank, die in einer Collection Dokumente jeglicher Struktur
akzeptiert - ist, wird vor der Entwicklung ein Datenformat definiert.
Dies soll gewährleisten, dass Folge-Komponenten wie die
Sentiment-Analyse auf eine feste Definition der Daten zurückgreifen
können. Letztlich soll so die Codequalität und Stabilität erhalten
werden, da ein nicht-typisiertes Format zu Fehlern führt. Vor der
Entwicklung der Komponenten, um Daten aktueller News Artikel zu
beziehen, wird daher das folgende Zielformat als Python DataClass
definiert und in einem zentralen *models/* Sub-Modul des Repos zur
Verfügung gestellt:
```python
@dataclass
class News:
id: str
title: str
date: str
text: str
source_url: str
def to_dict(self) -> dict:
return asdict(self)
Neben dem Inhalt des Artikels in Form des Titels, sowie dem Text werden
wertvolle Meta-Informationen wie das Veröffentlichungsdatum sowie der
Link des Artikels mitgeführt. So wird der späteren Analyse die
Möglichkeit gegeben, ihre Ergebnisse auch in den zeitlichen Kontext zu
legen und im Zuge der Visualisierung Nutzer auf den ursprünglichen
Artikel zu navigieren. Des Weiteren wird eine ID als eindeutiger
Identifier mitgeführt, um eindeutige Referenzen zu ermöglichen und
Duplikate zu verhindern. Die Werte des Feldes orientieren sich an den
IDs der jeweiligen Quelle - unter anderem Universally Unique Identifier (UUIDs). Die Anreicherung der Datensätze mit den
betroffenen Unternehmen sowie dem Sentiment erfolgt in der
nachgelagerten Analyse.
### RSS Feeds
Im Zuge der Vorbereitung des Projektes wurden RSS (Rich Site Summary) Feeds als mögliche Quelle für eine
regelmäßige Bereitstellung aktueller Nachrichten-Artikel und Events
identifiziert. Ein RSS Feed verfolgt das Ziel Informationen
regelmäßig über eine Web-Schnittstelle an Kunden zu liefern. Die Inhalte
bzw. deren Änderungen werden dabei in einem standardisierten,
maschinen-lesbaren Format zur Verfügung gestellt. [6]
Der genaue Aufbau eines RSS Feeds wird im von der Harvard Law School
entwickelten RSS 2.0 Standard beschrieben. Er beschreibt zunächst XML
als unterliegendes Dateiformat vor. Die Wurzel eines RSS Dokumentes wird
von einem sog. *Channel* gebildet, der in sich mehrere *Items* kapselt.
Neben dem Titel, Link des Feeds muss ein *Channel* eine Beschreibung
enthalten - weitere Meta-Informationen wie die Sprache oder
Informationen zum Urheberrecht sind möglich. Ein *Item* hingegen setzt
sich aus mindestens dem Titel, Link, Beschreibung, Autoren, Kategorie
\[\...\] zusammen. [7] Ein simpler RSS Feed könnte wie folgt
aussehen:
```xml
<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>W3Schools Home Page</title>
<link>https://www.w3schools.com</link>
<description>Free web building tutorials</description>
<item>
<title>RSS Tutorial</title>
<link>https://www.w3schools.com/xml/xml_rss.asp</link>
<description>New RSS tutorial on W3Schools</description>
</item>
<item>
<title>XML Tutorial</title>
<link>https://www.w3schools.com/xml</link>
<description>New XML tutorial on W3Schools</description>
</item>
</channel>
```
Bezogen auf die Anbindung an das Transparenzregister fällt auf, dass
diese Quelle zunächst nur Basis-Informationen des Artikels und noch
nicht den eigentlichen Inhalt, welcher für die Sentiment-Analyse sowie
Ermittlung betroffener Unternehmen besonders interessant sein könnte,
enthält.
Als erste Quelle wird das
[Handelsblatts](https://www.handelsblatt.com/rss-feeds/) gewählt. Es
stellt eine Reihe verschiedener Feeds zur Verfügung, die sich mit
unterschiedlichen Themen beschäftigen. Für eine Auswertung von
Unternehmens-Daten wird daher anfangs der RSS Feed
[Schlagzeilen](https://www.handelsblatt.com/contentexport/feed/schlagzeilen)
eingesetzt.
Eine Abfrage dieser Daten gestaltet sich aufgrund der standardisierten
XML Struktur über eine gezielte HTTP-Anfrage und nachträgliche
Auswertung des eigentlichen Artikels recht einfach:
```python
import requests
import xmltodict
from bs4 import BeautifulSoup
class HandelsblattRSS:
def __init__(self):
self.base_url = "https://www.handelsblatt.com/contentexport/feed"
def get_news_for_category(self, category: str = "unternehmen") -> dict:
url = f"{self.base_url}/{category}"
result = requests.get(url=url)
if result.status_code == 200:
return xmltodict.parse(result.text)["rss"]["channel"]["item"]
return None
def get_news_details_text(self, url: str) -> dict:
content = requests.get(url)
soup = BeautifulSoup(content.text, features="html.parser")
return " ".join(
[elem.text.replace("\n", " ") for elem in soup.find_all("p")][:]
)
Die eigentlichen Inhalte des Artikels weisen kein Standard Format mehr
auf, auch wenn diese HTML basiert sind, und müssen daher mit einer
Bibliothek wie BeautifulSoup verarbeitet werden. Im obigen Code werden
dabei lediglich alle Textinhalte ausgewertet und konkateniert.
Mit der obigen Python-Klasse können jedoch lediglich Daten extrahiert
werden, die Transformation sowie das Laden in die Staging DB erfordern
weitere Entwicklung.
```python
handelsblatt = HandelsblattRSS()
items = handelsblatt.get_news_for_category()
from datetime import datetime
from tqdm import tqdm
import pandas as pd
news = []
for news_article in tqdm(items):
info = {
"id": news_article["guid"],
"title": news_article["title"],
"date": datetime.strptime(
news_article["pubDate"], "%a, %d %b %Y %H:%M:%S %z"
).strftime("%Y-%m-%dT%H:%M:%S%z"),
"source_url": news_article["link"],
"text": handelsblatt.get_news_details_text(news_article["link"])
}
news.append(info)
```
### Tagesschau REST API
Als weitere Quelle für News Artikel wird die [Tagesschau
API](https://tagesschau.api.bund.dev/) eingesetzt. Diese wird von
bund.dev - einer Bemühung Verwaltungsdaten und Informationen über APIs
zur Verfügung zu stellen[8], bereitgestellt und ermöglicht
Anwendern einen maschinen-lesbaren Zugang zu Artikeln der Tagesschau.
Obwohl der Zugang zu diesen Daten ebenfalls über HTTP GET Anfragen
möglich ist, so entspricht das Datenformat keinem Standard, sondern dem
Produkt-eigenen Datenschema, welches vom Betreiber mit
[Swagger](https://swagger.io/) dokumentiert wurde, und erfordert daher
eine andere Aufbereitungs-Routine. Der Abruf der Daten bleibt jedoch
simpel und eine gezielte Suche nach Kategorien ist möglich.
```python
import json
import requests
from bs4 import BeautifulSoup
class TagesschauAPI:
def __init__(self):
self.base_url = "https://www.tagesschau.de/api2"
def get_news_for_sector(self, sector: str) -> dict:
url = f"{self.base_url}/news/"
regions = ",".join([str(i) for i in range(1, 16)])
result = requests.get(url=url, params={"regions": regions, "ressort": sector})
return result.json()
def custom_search(self, query: str) -> dict:
url = f"{self.base_url}/search/"
result = requests.get(url=url, params={"searchText": query})
return result.json()
def get_news_details_text(self, url: str) -> dict:
content = requests.get(url)
soup = BeautifulSoup(content.text, features="html.parser")
return " ".join(
[elem.text.replace("\n", " ") for elem in soup.find_all("p")][1:]
)
```
Auch hier werden erneut Details über den ursprünglichen Artikel bezogen.
Dafür wird der HTML-Inhalt der ursprünglichen Seite mit BeautifulSoup
geparsed und jeglicher Text des Dokumentes extrahiert.
## Orchestrierung & Operations
Sobald die obigen Lösungen das Entwicklungsstadium verlassen und
folglich eine Ausführungsumgebung benötigen, stellen sich die folgenden
Fragen:
- Wie kann die Ausführung der Anwendungen geplant und zeitbasiert
gestartet werden? (Scheduling)
- Wie lässt sich ein Monitoring auf die Ausführungsumgebung anwenden?
(Transparenz)
- Wie lässt sich die Ausführungsumgebung um weitere Anwendungen
erweitern? (Skalierbarkeit)
Zum einen wird ein Scheduling benötigt, um die Extraktion aktueller News
Artikel in festgelegten Intervallen (z.B. täglich) durchzuführen. Des
Weiteren muss es möglich sein, umfassende Informationen über diese
Ausführung zu erhalten. Da der Nutzer nicht vor dem Monitor sitzt, das
Skript manuell startet und seine Ausführung überwacht. Folglich ist es
schwierig im Nachhinein nachzuvollziehen, woran die Ausführung
gescheitert ist und wann dies geschah. Zuletzt muss die Lösung in der
Lage sein, auch weitere Anwendungen aufzunehmen, da auch Prozesse wie
die Extraktion der Stammdaten oder der Finanzdaten in Intervallen - wenn
auch größer, da Jahresabschlüsse immerhin nur jährlich anfallen -
ausgeführt werden können.
Eine einfache Lösung ist der Einsatz vom in Unix integrierten Cron
Dienst. Dieser ermöglicht den zeitbasierten Start von Skripten oder
Anwendungen. [9]
```sh
0 8 * * * /var/scripts/daily_cron.sh
```
Problematisch gestaltet sich beim Cron jedoch die Bereitstellung von
Fehlermeldungen sowie die Kapselung von Ausführungsumgebungen, welches
vor allem bei Python Anwendungen von hoher Bedeutung ist, damit es nicht
zu kollidierenden Abhängigkeiten kommt.
Eine in der Industrie verbreitete Lösung stellt Apache Airflow dar.
<img src="Abbildungen/Apache_Airflow.PNG" height=450 />
Wie in der obigen Grafik dargestellt, setzt sich eine Airlflow Instanz
aus verschiedenen Sub-Komponenten zusammen. Neben einem Web Server, der
eine grafische Bedienoberfläche sowie dazugehörige
API-Endpunkte
stellt, besteht der Kern aus dem Scheduler sowie Executor und der
dazugehörigen Datenbank. Anwender legen in Form eines
*Directed Acyclic Graphs (DAG)* einen
Anwendungsstrang an, der über den Scheduler aufgerufen und im Executor
sowie seinen *Workern* - also dedizierten Ausführungsumgebungen -
letztlich gestartet wird. Besonders in einer auf Kubernetes
bereitgestellten Instanz, der administrative Rechte über das Cluster
gegeben wurden, kann Airflow eigenständig für jede Ausführung einen
temporären Pod hochfahren, der somit die jeweiligen
DAGs komplett
isoliert ausführt und nach Ausführung wieder terminiert, um Ressourcen
zu sparen. Zusätzlich besteht die Möglichkeiten den Quellcode der
DAGs automatisch
über einen *Git-sync* Mechanismus mit einem Git Repo zu verbinden. So
kann neuer Code automatisch auf die Instanz übertragen werden, ohne dass
dieser manuell auf den Server kopiert werden muss. [10]
Zunächst wird im Rahmen des Transparenzregisters jedoch ein simples
Deployment auf Basis von Docker Compose bereitgestellt. Das
Deployment besteht aus den Kern-Komponenten sowie einem einzelnen
Worker. So gehen zwar die flexiblen Möglichkeiten des Hochfahrens
individueller Pods verloren, jedoch ist die Einrichtung umso einfacher.
Um die Ausführung der Extraktion der News Artikel zu automatisieren,
wird der Quellcode in das Format eines DAG überführt und an den Airflow Server
übertragen.
Aus Sicherheitsgründen wurden die Zugangsdaten zur StagingDB in die
Airflow Variables überführt, so dass die genauen Daten nicht im
Quellcode ersichtlich sind sowie zentral verwaltet werden können, um
schnell zwischen verschiedenen Ziel-Datenbanken wechseln zu können.
Diese werden in der grafischen Oberfläche durch den Admin Account unter
dem Menüpunkt *Admin/Variables* bzw. dem Link */variable* verwaltet.
![Verwaltungsoberfläche von Variablen in
Airflow](Abbildungen/airflow_variables.PNG)
Über den Einsatz des *\@task.virtualenv* Decorators wird sichergestellt,
dass jede Methode ihr eigenes Virtual Environment nutzt und so keine
Kollision von Python Abhängigkeiten auftreten können. Über das Argument
*schedule=\
\"@daily"* wird der DAG auf eine tägliche Ausführung terminiert.
Der Startzeitpunkt (*start_date*) muss vor dem aktuellen Datum liegen,
damit der Scheduler sofort agiert. Nach der Definition der einzelnen
Schritte, wird die Hauptlogik in Form eines geordneten Aufrufs der eben
definierten Methode und Übergabe der nötigen Parameter definiert.
## Rechtliche Rahmenbedingungen
Der Einsatz von Web Scraping / Crawling, um die Daten der vorgestellten
Quellen zu beziehen, lässt sich auf verschiedene Bereiche geltenden
Rechts verweisen, um zu bewerten auf welche weiteren Rahmenbedingungen
Rücksicht genommen werden muss.
Zum einen ist das Urheberrecht der Daten zu berücksichtigen. Da es sich
bei dem Projekt um nicht-kommerzielle Forschung handelt, ist ein
Scraping grundsätzlich gemäß §60d des Urheberrechtsgestzes (UrhG) erlaubt. Jedoch ist darauf zu achten,
dass Mechanismen wie die *robots.txt* sowie weitere technische Hürden -
etwa IP-Sperren - nicht umgangen werden. Außerdem darf dem Betreiber der
Website durch die erzeugte Last keine Schäden beigefügt
werden. [11]
Des Weiteren ist besonderes Augenmerk auf den Datenschutz zu legen. Da
im Transparenzregister eine Reihe von personenbezogenen Daten wie Namen,
Geburtsdaten und Wohnort sowie Firmenzugehörigkeiten verarbeitet werden,
muss sichergestellt sein, dass Richtlinien der
Datenschutzgrundverordnung (DSGVO) eingehalten
werden. Auch wenn es sich bei den verarbeiteten Daten um frei
zugängliches Material handelt, muss dennoch ein Rahmenwerk aufgebaut
werden, welches die Ziele des Projektes klar absteckt, um seinen
Forschungscharakter zu wahren, und ggf. entsprechende Maßnahmen wie etwa
ein zeitlich begrenzte Speicherdauer und Zugangsbeschränkung
implementiert werden. [11]
## Zusammenfassung
Im Rahmen der Hausarbeit wurden zunächst die theoretischen Grundlagen
bestehend aus dem Web Scraping und Crawling sowie dem
ETL-Prozess
vermittelt. Darauf folgte die Vertiefung dieser Themen anhand der
praktischen Umsetzung in der Data Extraktion für das Transparenzregister
aus unterschiedlichen Quellen. Zu diesen zählten unterschiedliche
Provider für News Artikel wie etwa der RSS Feed des Handelsblattes sowie die
Tagesschau REST API der *bund.dev* Gruppe. Darauf folgend wurden
Unternehmens Stammdaten aus dem Unternehmensregister und Bewegungsdaten
aus dem Bundesanzeiger bezogen. Zuletzt folgte die Ermittlung einer
geeigneten Ausführungsplattform in Form von Apache Airflow sowie das
Deployment der ersten DAGs auf dieser. Rechtliche
Rahmenbedingungen, die die weitere Entwicklung beeinflussen werden,
wurden am Schluss dargestellt.
### Fazit
Beim Bezug von Daten aus staatlichen Quellen zeigte sich schnell, dass
noch keine vollends in APIs überführte Lösungen vorliegen. So gestaltete
sich die Exktaktion von Daten hier im Vergleich zu den News Artikeln
deutlich komplizierter und vor allem fehleranfälliger. Sollte sich die
Struktur der Website nur im geringsten ändern - zum Beispiel indem
Buttons umbenannt oder komplett ersetzt werden - so wird die Anwendung
nicht mehr korrekt agieren können. Da es sich ohnehin um öffentliche
Daten für den allgemeinen Zugriff handelt, wäre hier eine anständige
Lösung auf Seiten der Betreiber wünschenswert.
Der Einsatz von Apache Airflow stellte sich in dem gegebenen Aufbau als
fehleranfällig heraus. Dadurch, dass die Instanz nur über einen einzigen
Worker verfügt, der sich jedoch von verschiedenen Anwendungen geteilt
wird, die dennoch unterschiedliche Abhängigkeiten einsetzen müssen, war
das Aufsetzen der DAGs deutlich komplizierter und auch der
Einsatz von Bibliotheken, um Duplikate zu vermeiden, konnte dem Worker
nur schwer beigebracht werden. Hier wurde deutlich, dass - trotz
komplizierteren initialen Setups - ein auf Kubernetes und dem
*KubernetesPodOperator* basiertes Deployment hier eleganter wäre.
Dennoch ist das Monitoring von Airflow im Vergleich zu anderen Lösungen
zu loben, so dass ein weiterer Einsatz geplant ist.
### Ausblick
Nun gilt es die entwickelten Puzzle-Stücke zusammenzusetzen und die
StagingDB mit immer mehr Daten zu versorgen. Da News Artikel nun
regelmäßig eingespeist werden und solange sich an den APIs der Betreiber
nichts ändert auch keine Code Änderungen erforderlich sind, wird der
Fokus klar auf dem Beziehen weiteren Stamm- und Bewegungsdaten der
Unternehmen liegen, um so die weiteren Prozesse wie die
Verflechtugnsanalyse und Datenvisualisierung mit Informationen zu
versorgen. Sollten sich im Laufe der Entwicklung weitere News-Quellen
als wertvoll erweisen, kann auf der bestehenden Code-Basis schnell eine
weitere Integration geschaffen werden.
Des Weiteren mag der im vorherigen Absatz angesprochene Vorschlag,
Airflow auf Kubernetes aufzusetzen, eine sinnvolle Ergänzung sein. In
Zukunft werden auch Finanzdaten in regelmäßigen Abständen vom
Bundesanzeiger bezogen werden müssen, so dass die Investition in eine
besser skalierbare Lösung wertvoll wäre.
Ein weiterer indirekter Vorteil von Airflow ist, dass Entwickler dazu
gezwungen sind, ihren Code möglichst modular zu halten, um so die
Ausführungsumgebung zu abstrahieren. Der Einsatz von Airflow und das
Verpacken des Codes in einen DAG sollte lediglich bestehende
Klassen/Methoden verwenden und in die richtige Reihenfolge bringen bzw.
verknüpfen. Der Aufbau einer klaren Interface-Struktur und guten
Teststrategie ist daher sehr wichtig. Der damit verbundene Aufwand muss
entsprechend eingeplant werden.
## Anhang
### Company Datenmodell
![Datenstruktur der Unternehmensdaten auf der
StagingDB](Abbildungen/UML_CompanyDaten.PNG)
### Ablauf Web Scraping im Unternehmensregister
![Ablaufdiagramm des Web Scrapings des
Unternehmensregisters](Abbildungen/ablaufdiagramm_unternehmensregister.PNG)
### Airflow DAG
```python
import datetime
from airflow.decorators import dag, task
@dag("handelsblatt_news_dag", start_date=datetime.datetime(2000, 1, 1), schedule="@daily", catchup=False)
def main():
@task.virtualenv(system_site_packages=False, requirements=['requests', 'bs4', 'xmltodict'])
def fetch_data():
from datetime import datetime
from transparenzregister.utils.handelsblatt import HandelsblattRSS
handelsblatt = HandelsblattRSS()
items = handelsblatt.get_news_for_category()
data = []
for article in items:
info = {
"id": article["guid"],
"title": article["title"],
"date": datetime.strptime(
article["pubDate"], "%a, %d %b %Y %H:%M:%S %z"
).strftime("%Y-%m-%dT%H:%M:%S%z"),
"source_url": article["link"],
"text": handelsblatt.get_news_details_text(article["link"]),
}
data.append(info)
return data
@task.virtualenv(requirements=["pymongo"], task_id="Process_data")
def process_data(list_news: list) -> list:
from airflow.models import Variable
from transparenzregister.utils.mongo import MongoConnector, MongoNewsService
from transparenzregister.models.news import News
connector = MongoConnector(
hostname=Variable.get("mongodb_host"),
database="transparenzregister",
username=Variable.get("mongodb_username"),
password=Variable.get("mongodb_password"),
port=None
)
service = MongoNewsService(connector)
num_inserted = 0
for info in list_news:
article = News(**info)
if service.get_by_id(article.id) is None:
service.insert(article)
num_inserted += 1
return num_inserted
raw_data = fetch_data()
inserted_entries = process_data(raw_data)
print(f"Number of inserted entries: {inserted_entries}")
return inserted_entries
news_dag = main()
### Literaturverzeichnis
- **[1]** Samanpour, A. R. (2016). *Studienunterlagen: Business Intelligence 1* (1. Auflage). Wissenschaftliche Genossenschaft Südwestfalen eG. Iserlohn.
- **[2]** IONOS. (2020). *Was ist Web Scraping?*. [Link](https://www.ionos.de/digitalguide/websites/web-entwicklung/was-ist-web-scraping/) (Zugriff am: 16. September 2023).
- **[3]** IONOS. (2016). *Indexierungsmanagement mit der robots.txt*. [Link](https://www.ionos.de/digitalguide/hosting/hosting-technik/indexierungsmanagement-mit-der-robotstxt/) (Zugriff am: 16. September 2023).
- **[4]** Bundesanzeiger Verlag GmbH. *Unternehmensregister*. [Link](https://www.unternehmensregister.de/) (Zugriff am: 24. September 2023).
- **[5]** Google. *Was ist CAPTCHA?*. [Link](https://support.google.com/a/answer/1217728?hl=en) (Zugriff am: 3. Oktober 2023).
- **[6]** Markgraf, D. P. (2018). *RSS-Feed*. Springer Fachmedien Wiesbaden GmbH. [Link](https://wirtschaftslexikon.gabler.de/definition/rss-feed-53722/version-276790) (Zugriff am: 16. September 2023).
- **[7]** Harvard Law. (2014). *RSS 2.0*. [Link](https://cyber.harvard.edu/rss/) (Zugriff am: 16. September 2023).
- **[8]** bundDEV. *Wir dokumentieren Deutschland*. [Link](https://bund.dev/) (Zugriff am: 16. September 2023).
- **[9]** ubuntu Deutschland e.V. (2023). *Cron*. [Link](https://wiki.ubuntuusers.de/Cron/) (Zugriff am: 1. Oktober 2023).
- **[10]** The Apache Software Foundation. *Apache Airflow*. [Link](https://airflow.apache.org/docs/apache-airflow/2.7.1/index.html) (Zugriff am: 1. Oktober 2023).
- **[11]** Universität Hamburg - Fakultät für Wirtschafts- und Sozialwissenschaften. (2023). *Handreichung zur rechtskonformen Durchführung von Web-Scraping Projekten in der nicht-kommerziellen wissenschaftlichen Forschung*. [PDF Link](https://www.wiso.uni-hamburg.de/forschung/forschungslabor/downloads/20200130-handreichung-web-scraping.pdf) (Zugriff am: 2. Oktober 2023).

View File

@ -39,14 +39,18 @@ Diese sind, um Industriestandards zu entsprechen, auf Englisch gehalten.
:numbered:
Ergebnisse/Zwischenbericht_und_Praesentation/PhHo/dev-ops
Ergebnisse/Zwischenbericht_und_Praesentation/TrNo/Ausarbeitung.md
seminararbeiten/Datenspeicherung/00_Datenspeicherung
.. toctree::
:maxdepth: 3
:caption: Abschlussberichte
:numbered:
Ergebnisse/Abschlussbericht_und_Praesentation/PhHo/05-DEV-OPS
Ergebnisse/Abschlussbericht_und_Praesentation/TrNo/S4-2.md
Ergebnisse/Abschlussbericht_und_Praesentation/TrNo/S4-3-1.md
Ergebnisse/Abschlussbericht_und_Praesentation/PhHo/4-4-2-database-generator

Binary file not shown.

Before

Width:  |  Height:  |  Size: 343 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB