# Initialschätzung von Kurswechselpositionen eines Segelboots auf einer Karte anhang con Wind, Start und Zielpunkt

## Motivation

Ziel dieser Semester abschließenden schriftlichen Ausarbeitung im Fach "Maschine Learning" an der Fachhochschule Südwestfalen ist das Generieren einer Heatmap von Kurswechselpositionen eines Segelbootes zu einer Karte abhängig von Wind und der Zielpostion. Dies soll das Finden einer guten Route vereinfachen, indem die Qualität einer ersten Route, die danach über ein Quotientenabstiegsverfahren optimiert werden soll verbessern. Da ein solches Quotientenabstiegsverfahren sehr gerne in einem Lokalen minimum festhängt, müssen mehrere routen gefunden und optimiert werden. Hier soll untersucht werden, ob dies durch eine Ersteinschätzung der Lage durch KI verbessert werden kann.

Eingesetzt werden soll die so erstellte KI in dem Segelroboter des [Sailing Team Darmstadt e.V.](https://www.st-darmstadt.de/) Einer Hochschulgruppe an der TU-Darmstadt welche den ["roBOOTer"](https://www.st-darmstadt.de/ueber-uns/boote/prototyp-ii/) ein vollautonomes Segelboot welches eines Tages den Atlantik überqueren soll. [Eine technische Herausforderung welche zuerst von einem norwegischen Team erfolgreich abgeschlossen wurde](https://www.microtransat.org/).

## Inhaltsverzeichnis

1. Einleitung
 1.1. Situation
 
 1.2. Vorgehen zur unterstützenden KI

2. Vorbereitungen

 2.1. Imports
 
 2.2. Parameter und Settings
3. Szenarien und Routen Generieren
4. Daten betrachten und Filtern
5. KI Modell erstellen
6. Training
7. Analyse der KI
8. Ausblick
 

## Einleitung

### Situation

Eine Routenplanung für ein Segelboot hat ein Problem, welches man sonst so eher nicht kennt. Eine relativ freie Fläche auf der Sich das Schiff bewegen kann. Dies verändert die Wegfindung wie man sie von der Straße kennt fundamental.

Navigiert man auf Straßen, hat man zumindest nach einer ersten abstraction relativ wenige Freiheitsgrade für den Weg.
Die Richtung kann nur an Kreuzungen gewechselt werden und dort nur in Richtungen in die es Straßen gibt. Beim Segeln auf dem freien Meer ist jeder Ort ein potenzieller Wendepunkt von dem aus Potenziell in jede Richtung gesegelt werden kann.

Dennoch ist es oft auch ohne Hindernisse zwischen Boot und Ziel oft nicht möglich das Ziel direkt anzufahren das sich die Maximalgeschwindigkeiten relativ zur Windrichtung verändern.
Das folgende Diagramm zeigt die Segelgeschwindigkeiten an einem Katamaran.

"Ship

Da der roBOOTer anders als an Katamaran nicht auf Geschwindigkeit, sondern auf mechanische Belastbarkeit ausgelegt wurde hat der Fahrtwind einen geringeren einfluss auf das Fahrtverhalten des Segelboots dies und eine andere Maximalgeschwindigkeit sorgen für ein etwas anderes Fahrverhalten. Die ungefähre Form der Kurven trifft aber auch auf den roBOOTer zu. Man kann deutlich erkennen das auch, wenn man nicht direkt gegen den Wind fahren kann man schräg gegen den wind immer noch erstaunlich schnell ist.

Das aktuelle Verfahren zum Finden einer Route läuft folgendermaßen ab:

Eine direkte Route wird berechnet. Die Route wird an jedem Hindernisse geteilt und rechts und links um jedes hindernis herum gelegt. Bei folgenden hindernissen werden die Routen wieder geteilt somit erhält man $2^n$ Vorschläge für Routen wobei $n$ die Anzahl der Hindernisse auf der Route ist. Jeder Abschnitt der Route wird noch einmal zerteilt, um der Route mehr Flexibilität zu geben.

Die Routen werden dann simuliert, um die Kosten der Route zu berechnen. Die so simulierte Route wird danach über die Kosten in einem Gradientenabstiegsverfahren optimiert.

Das ganze oben beschriebene Verfahren ist relativ schnell sehr rechenaufwendig und findet nicht immer ein Ergebnis. Wird kein Ergebnis gefunden wird eine mehr oder weniger zufällige Route optimiert.

Diese Ausarbeitung soll wenigstens bei der alternativen Routenfindung helfen. Im idealfall kann es aber auch genutzt werden, um die auswahl der Routen um Hindernisse frühzeitig zu reduzieren und den Rechenaufwand unter $2^n$ zu senken wobei $n$ die Anzahl von Hindernissen auf der Route ist.

### Vorgehen zur unterstützenden KI

#### Eingaben und Ausgeben

Die Algorithm zur Wegfindung vom Sailing Team Darmstadt e.V. arbeiten intern mit Polygonen als Hindernissen. Diese werden durch die Shapely Bibliothek implementiert. Da eine variable Anzahl an Polygonen mit einer variablen Form und Position eine Relative komplexer Input muss dieser in eine normierte Form gebracht werden. Ein binärfärbens Bild ist dafür die einfachste Form.

Für den Computer spielen sowohl Zentrierung, Skalierung und Ausrichtung der Karte keine Rolle.
Wir rotieren also die Karte immer so das der Wind von *Norden* kommt und das Boot / die Startposition in der *Mitte* der Karte liegt. Da distanz Liner ist, wird davon ausgegangen das Scenario einfach skaliert passend skaliert werden kann.

Die nächste eingabe ist die Zielposition relativ zum Startpunkt. Diese kann entweder durch ein einzelnes Pixel in einem zweiten Farbkanal oder aber in abstrakterer Form an die KI übergeben werden.

Als ausgabe wird eine Heatmap erwartet. Zwei alternative Heatmaps sind relative einfach denkbar.

1. Eine Headmap der Kurswechselpositionen
2. Eine Headmap des Kursverlaufes

Headmaps sind in gewisser Weise Bilder. Das Problem wird daher wie ein Bild zu Bild KI Problem betrachtet. Diese werden normalerweise durch ANNs gelöst.

Um eine ANN zu trenntieren gibt es immer die Wahl zwischen drei Primären prinzipien. Dem unüberwachten Lernen, dem reinforcement Learning und dem überwachten Lernen. Letzteres ist dabei meist am einfachsten wenn auch nicht immer möglich.

Der Wegfindealgorithmus des Sailing Team Darmstadt e.V. ist zwar noch in der Entwicklung, funktioniert aber hinreichend gut, um auf einem normalen PC Scenarios mit Routen zu paaren oder auch diese zu *labeln*, um beim KI lingo zu bleiben. Um anpassungsfähig an andere Scenarios zu sein wird eine große Menge unterschiedlicher Scenarios und Routen benötigt.
Da das Haupteinsatzgebiet das Meer ist gehen wir von einer Insellandschaft oder Küstenlandschaft aus.

Zum Finden von Scenarios gibt es zwei Möglichkeiten.

1. Das Auswählen von umgebungen von der Weltkarte und das Bestimmen eines Zielpunktes.
2. Das Generieren von künstlichen Scenarios.
 
Hier wird die Annahme getroffen das sich ANNs von einem Datensatz auf dem anderen Übertragen lassen.
Der Aufwand für künstliche Scenarios wird hierbei als geringer eingestuft und daher gewählt.

## Vorbereitungen

Folgende Python Bibliotheken werden verwendet:

1. `Tensorflow`
 Die `Tensorflow` Bibliothek ist das Werkzeug welches verwendet wurde, um neuronale Netz zu modellieren, zu trainieren, zu analysieren und auszuführen.

2. `pyrate`
 Die `Pyrate` Bibliothek ist Teil des ROS Operating Systems, welches den roBOOTer betreibt. Kann Routen zu Scenarios finden.

3. `Shapley`
 Die `shapley` Bibliothek wird genutzt, um geometrische Körper zu generieren, zu mergen und an den Roboter zum Labeln weiterzugeben.

4. `pandas`
 Die `pandas` Bibliothek verwaltet, speichert und analysiert daten.

5. `numpy`
 Eine Bibliothek um Mathematische operations an multidimensionalen Arrays auszuführen.

6. `matplotlib`
 Wird genutzt um Diagramme zu plotted.

6. `PIL`
 Eine Library um Bilder manuell zu zeichnen.

7. `humanize`
 Konvertiert Zahlen, Daten und Zeitabstände in ein für menschen einfach leserliches Format.

8. `tqdm`
 Fügt einen Fortschrittsbalken zu vielen Problemen hinzu.

#### Imports
Importiert die Imports the necessary packages from python and pypi.

In [1]:
import sys

# Pins the python version executing the Jupyter Notebook
assert sys.version_info.major == 3
assert sys.version_info.minor == 10

import os
from typing import Optional, Final, Literal
import glob
import pickle

from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import ImageDraw, Image
from shapely.geometry import Polygon, Point, LineString
from shapely.ops import unary_union
import tensorflow as tf
import humanize

Definiert den Pfad an dem das Jupyter Notebook ausgeführt werden soll.
Importiert die pyrate module. Wird nur ausgeführt, wenn innerhalb des Pyrate Containers ausgeführt.

In [None]:
# Import route generation if started in the docker container
if os.getenv("PYRATE"):
 %cd /pyrate/
 import experiments
 from pyrate.plan.nearplanner.timing_frame import TimingFrame

# Protection against multi exection
if not os.path.exists("experiments"):
 %cd ../

In [None]:
if os.getenv("PYRATE"):
 # Sets the maximum number of optimization steps that can be performed to find a route.
 # Significantly lowered for more speed.
 experiments.optimization_param.n_iter_grad = 50

 # Disables verbose outputs from the pyrate library.
 experiments.optimization_param.verbose = False

In [None]:
# The scale the route should lie in. Only a mathematical limit.
SIZE_ROUTE: Final[int] = 100

# The outer limit in with the goal need to be palced.
# Should be smaller than
SIZE_INNER: Final[int] = 75
assert SIZE_ROUTE > SIZE_INNER, "The goal should be well inside the limit placed "

# The minimum destance from the start that should
MIN_DESTINATION_DISTANCE: Final[int] = 25
assert (
 SIZE_INNER > MIN_DESTINATION_DISTANCE
), "The goal should be well closer to the outer limit the"

# The size the ANN input has. Equal to the image size. Should be an on of $n^2$ to be easier compatible with ANNs.
IMG_SIZE: Final[int] = 128

# The size an image should be in to be easily visible by eye.
IMG_SHOW_SIZE: Final[int] = 400

# The number of Files that should be read to train the ANNs
NUMBER_OF_FILES_LIMIT: Final[int] = 1000

#
NO_SHOW = False
GENERATE_NEW = True

## Szenarien und Routen Generieren

Um das neuronale Netz zu trainieren werden Datensätze benötigt. Für die Abschätzung der Routen wird eine Karte mit Hindernissen und eine zugehörige Route benötigt. Hier wurde die Designentscheidung getroffen die Karten nicht auszuwählen, sondern zu generieren.

### Generieren von Karten

Eine Karte ist für das Sailing Team Darstadt eine Mange von statischen und dynamischen Hindernissen. Statische Hindernisse sind Inseln, Landmassen und Untiefen und Fahrferbotszonen. Dynamische Hindernisse sind andere Teilnehmer am Schiffsverkehr und Wetterereignisse.
In dieser KI wird sich auf statische Hindernisse beschränkt. Daher ist eine Scenario eine Maenge an Hindernispoligonen.
Um das generieren der Poligone einfacher zu regeln und größere statistische Kontrolle über die den Generationsvorgang zu haben sind alle generierten Basispolinome als Abschnitte auf einem Umkreis definiert die Zufällig über die Karte verteilt werden.

Ein einzelnes Polygon wird hier Folgendermaßen generiert:
1. Die Anuzahl der Kanten/Ecken wird festgelegt.
2. Ein lognormal verteilter Radius wird zufällig ausgewählt.
3. Auf dem Radius werden n winkel abgetragen.
4. Die Winkel werden sortiert damit sich das Polygon nicht selbstschneidet.
5. Die durch Radius und Winkel entstehenden Punkte werden in das kartesische Koordinatesnsystem Umgewandelt.
6. Der zufällige Offset / Polygonmittelpunkt wird aufaddiert.
7. Aus den so generierten `np.ndarray` wird ein `shapely.geometry.Polygon` erstellt.

So wird eine Festgelegte Anzahl von Polygonen generiert.
Setzt man vor dem generieren des ersten Polygons eines Scenarios eine random seed über `np.random.seed` so erhält man zu jedem seed ein eindeutiges mänge an Polygonen wenn auch alle anderen Parameter übereinstimmen. Diese Polygonmänge hat nun mit hoher Warscheinlichkeit überlappende Polygone. Dies ist für den Algorithmus des Sailing Teams Darmstadt e.V. ein Problem. Die Shaeply libraray besitzt eine Union function die vereinigungsmängen von Polygonen bildet wenn möglich. So erhält man eine reduzierte mänge an Polygonen. Diese kann später an einen Solver übergeben werden.

In [None]:
# https://stackoverflow.com/questions/16444719/python-numpy-complex-numbers-is-there-a-function-for-polar-to-rectangular-co
def polar_to_cartesian(
 radii: np.ndarray,
 angles: np.ndarray,
):
 """Transforms polar coordinates into cartesian coordinates.

 Args:
 radii: A array of radii.
 angles: A array of angles.

 Returns:
 An array of cartesian coordinates.
 """
 return radii * np.exp(2j * angles * np.pi)

In [None]:
def random_polygon(
 radius_mean: float = 2,
 radius_sigma: float = 1.5,
):
 """Generates the simplest of polygons, a triangle with a size described by a random polygon.

 Args:
 radius_mean: The average radius defining a circumcircle of a triangle.
 radius_sigma: The variance of a radius defining a circumcircle of a triangle.

 Returns:
 A single triangle.
 """
 number_of_corners = np.random.randint(3, 10)
 array = polar_to_cartesian(
 np.random.lognormal(radius_mean, radius_sigma),
 np.sort(np.random.rand(number_of_corners)),
 )
 offset = np.random.randint(low=-SIZE_ROUTE, high=SIZE_ROUTE, size=(2,))
 return_values = np.zeros((number_of_corners, 2), dtype=float)
 # return_values[1, :] = np.real(offset)
 return_values[:] = offset
 return_values[:, :] += np.array((np.real(array), np.imag(array))).T
 return Polygon(return_values)
 # return np.array( + offset[0], np.imag(array) + offset[1])


np.random.seed(42)
random_polygon()

In [None]:
def generate_obstacles(
 seed: Optional[int] = None,
 number_of_polygons: int = 40,
 radius_mean: float = 2,
 radius_sigma: float = 1,
) -> dict[str, Polygon]:
 """Generates a set of obstacles from a union of triangles.

 The union of triangles meas that if polygons overlap o polygon containing the union of those polygons is returned.
 Args:
 seed: A seed to generate a set of obstacles from.
 number_of_polygons: The number of polygons that should be drawn.
 radius_mean: The average radius defining a circumcircle of an obstacle triangle.
 radius_sigma: The variance of a radius defining a circumcircle of an obstacle triangle.

 Returns:
 A list of unified obstacles.
 """
 if seed is not None:
 np.random.seed(seed)
 polygons = []
 for _ in range(number_of_polygons):
 poly = random_polygon(radius_mean, radius_sigma)
 if poly.contains(Point(0, 0)):
 continue
 if poly.exterior.distance(Point(0, 0)) < 1:
 continue
 polygons.append(poly)
 polygon_list = list(unary_union(polygons).geoms)
 return {str(i): p for i, p in enumerate(polygon_list)}

In [None]:
def generate_destination(
 obstacles: dict[str, Polygon],
 seed: Optional[int] = None,
) -> Point:
 """Generates for a map.

 Can be used to generate a valid destination for list of obstacles.
 Args:
 obstacles: A list of obstacles.
 seed: The seed determining the point.

 Returns:
 A goal that should be reached by the ship.
 """
 # sets the seed
 if seed is not None:
 np.random.seed(seed)

 # generates the point
 point: Optional[Point] = None
 while (
 point is None
 or abs(point.x) < MIN_DESTINATION_DISTANCE
 or abs(point.y) < MIN_DESTINATION_DISTANCE
 or any(obstacle.contains(point) for obstacle in obstacles.values())
 ):
 point = Point(np.random.randint(-SIZE_INNER, SIZE_INNER, size=(2,), dtype=int))
 return point


print(generate_destination(generate_obstacles(42), 42))

In [None]:
def plot_situation(
 obstacles: dict[str, Polygon],
 destination: Point,
 obstacle_color: str | None = None,
 route=None,
 legend: bool = True,
 title: str | None = None,
) -> None:
 """PLots the obstacles into a matplotlib plot.

 Args:
 obstacles: A list of obstacles.
 destination: The destination that should be reached by the boat.
 obstacle_color: The color the obstacles should have. Can be None.
 If none all obstacles will have different colors.
 route: The route that should be plotted.
 legend: If true plots a legend.
 title: The title of the plot.
 Returns:
 None
 """
 # x.figure(figsize=(8, 8))
 # plt.axis([70.9481331655341 - 5, 70.9481331655341 + 5, 43.24219045432384-5, 43.24219045432384+5])
 plt.axis([-SIZE_ROUTE, SIZE_ROUTE, -SIZE_ROUTE, SIZE_ROUTE])

 # Sets a title if one is demanded
 if title:
 plt.title(title)

 # Plots the obstacles.
 if obstacles:
 for polygon in obstacles.values():
 if obstacle_color is not None:
 plt.fill(*polygon.exterior.xy, color=obstacle_color, label="Obstacle")
 else:
 plt.fill(*polygon.exterior.xy)

 # Plots the wind direction
 # https://www.geeksforgeeks.org/matplotlib-pyplot-arrow-in-python/
 plt.arrow(
 0,
 +int(SIZE_ROUTE * 0.9),
 0,
 -int(SIZE_ROUTE * 0.1),
 head_width=10,
 width=4,
 label="Wind (3Bft)",
 )

 if route is not None:
 if isinstance(route, np.ndarray):
 plt.plot(route[:, 0], route[:, 1], color="BLUE", marker=".")
 else:
 if isinstance(route, TimingFrame):
 plt.plot(
 route.points[:, 0], route.points[:, 1], color="BLUE", marker="."
 )
 else:
 raise TypeError()

 # Plots the estimation
 if destination:
 plt.scatter(*destination.xy, marker="X", color="green", label="Destination")
 plt.scatter(0, 0, marker="o", color="green", label="Start")

 if legend:
 # https://stackoverflow.com/questions/13588920/stop-matplotlib-repeating-labels-in-legend
 handles, labels = plt.gca().get_legend_handles_labels()
 by_label = dict(zip(labels, handles))
 plt.legend(by_label.values(), by_label.keys())
 return None

In [None]:
if not NO_SHOW:
 plt.figure(figsize=(17.5, 25))
 for seed in tqdm(range(12)):
 plt.subplot(4, 3, seed + 1)
 generated_obstacles = generate_obstacles(seed)
 generated_destination = generate_destination(generated_obstacles, seed)
 route_generated = None

 # noinspection PyBroadException
 try:
 route_generated, _ = experiments.generate_route(
 position=Point(0, 0),
 goal=generated_destination,
 obstacles=generated_obstacles,
 wind=(18, 180),
 )
 except Exception:
 route_generated = None

 plot_situation(
 obstacles=generated_obstacles,
 destination=generated_destination,
 obstacle_color="RED",
 route=route_generated,
 title=f"Seed: {seed}, Cost: {route_generated.cost:.3f}"
 if route_generated
 else f"Seed: {seed}",
 legend=seed == 0,
 )
 plt.show()

In [None]:
def generate_image_from_map(
 obstacles: dict[str, Polygon],
 destination: Point,
 route=None,
 route_type: Literal["line", "dot"] = "dot",
) -> Image:
 """Generate an image from the map.

 Can be used to feed an ANN.
 - Obstacles are marked as reed.
 - The destination is marked as green.
 - The points where the route will likely change are blue.

 Args:
 obstacles: A dict of obstacles as shapely Polygons. Keyed as a string.
 destination: A destination that should be navigated to.
 route: The calculated route that should be followed.
 route_type: How the route is drawn. If 'line' is selected the complete route is selected.
 If 'dot' is selected the turning points a drawn in.
 """
 img = Image.new(
 "RGB",
 (IMG_SIZE, IMG_SIZE),
 "#000000",
 )
 draw = ImageDraw.Draw(img)
 for polygon in obstacles.values():
 draw.polygon(
 list(
 (np.dstack(polygon.exterior.xy).reshape((-1)) + SIZE_ROUTE)
 / (2 * SIZE_ROUTE)
 * IMG_SIZE
 ),
 fill="#FF0000",
 outline="#FF0000",
 )
 if os.getenv("PYRATE"):
 if isinstance(route, TimingFrame):
 route = route.points
 if route is not None:
 route = ((route + SIZE_ROUTE) / (2 * SIZE_ROUTE) * IMG_SIZE).astype(int)
 if route_type == "line":
 draw.line([tuple(point) for point in route], fill=(0, 0, 0xFF))
 elif route_type == "dot":
 for point in route[1:]:
 img.putpixel(point, (0, 0, 0xFF))
 else:
 raise ValueError("Route type unknown.")
 img.putpixel(
 (
 int((destination.x + SIZE_ROUTE) / (2 * SIZE_ROUTE) * IMG_SIZE),
 int((destination.y + SIZE_ROUTE) / (2 * SIZE_ROUTE) * IMG_SIZE),
 ),
 (0, 0xFF, 0),
 )
 return img

In [None]:
def generate_example_image(route_type: Literal["line", "dot"]):
 """
 Generates an example image with the seed 42.

 Args:
 route_type: How the route is drawn. If 'line' is selected the complete route is selected.
 If 'dot' is selected the turning points a drawn in.

 Returns:
 The example image.
 """
 obstacles = generate_obstacles(42)
 destination = generate_destination(obstacles, 42)
 try:
 route, _ = experiments.generate_route(
 position=Point(0, 0),
 goal=destination,
 obstacles=obstacles,
 wind=(18, 180),
 )
 except Exception:
 route = None
 return generate_image_from_map(
 obstacles=obstacles,
 destination=destination,
 route=route,
 route_type=route_type,
 )

In [None]:
generate_example_image(route_type="dot").resize(
 (IMG_SHOW_SIZE, IMG_SHOW_SIZE), Image.Resampling.BICUBIC
)

In [None]:
generate_example_image(route_type="line").resize(
 (IMG_SHOW_SIZE, IMG_SHOW_SIZE), Image.Resampling.BICUBIC
)

In [None]:
if not NO_SHOW:
 for seed in tqdm([42]):
 plt.figure(figsize=(8, 8))
 wind_dir = 180
 generated_obstacles = generate_obstacles(seed)
 generated_destination = generate_destination(generated_obstacles, seed)
 route_generated = None
 try:
 route_generated, _ = experiments.generate_route(
 position=Point(0, 0),
 goal=generated_destination,
 obstacles=generated_obstacles,
 wind=(18, wind_dir),
 )
 except Exception as e:
 route_generated = None
 plot_situation(
 obstacles=generated_obstacles,
 destination=generated_destination,
 obstacle_color="RED",
 route=route_generated,
 title=f"Seed: {seed}, Cost: {route_generated.cost:.3f}"
 if route_generated
 else f"Seed: {seed}",
 legend=seed == 0,
 )
 plt.show()

In [None]:
def generate_all_to_series(
 seed: Optional[int] = None, image: bool = False
) -> pd.Series:
 """Generates everything and aggregates all data into a `pd:Series`.

 Args:
 seed:The seed that should be used to generate map and destination.
 image: If an image should be generated or if that should be postponed to save memory.
 Returns:
 Contains a `pd.Series`containing the following.
 - The seed tha generated the map.
 - The destination in x
 - The destination in y
 - A list of Obstacle polygons.
 - The route generated for this map by the roBOOTer navigation system.
 - Optionally the image containing all the information.
 Can be generated at a later date without the fear for a loss of accuracy.
 """
 obstacles = generate_obstacles(seed)
 destination = generate_destination(obstacles, seed)

 try:
 route, _ = experiments.generate_route(
 position=Point(0, 0),
 goal=destination,
 obstacles=obstacles,
 wind=(18, wind_dir),
 )
 except Exception:
 route = None
 return pd.Series(
 data={
 "seed": str(seed),
 "obstacles": obstacles,
 "destination_x": destination.x,
 "destination_y": destination.y,
 "image": generate_image_from_map(obstacles, destination, route)
 if image
 else pd.NA,
 "route": route.points if route else pd.NA,
 "cost": route.cost if route else pd.NA,
 },
 name=str(seed),
 )

In [None]:
if not NO_SHOW:
 df = pd.DataFrame(
 [generate_all_to_series(i, image=False) for i in tqdm(range(2))]
 ).set_index("seed")
 df.to_pickle("test.pickle")
 df

https://programtalk.com/python-examples/PIL.ImageDraw.Draw.polygon/)
https://stackoverflow.com/questions/3654289/scipy-create-2d-polygon-mask

In [None]:
if os.getenv("PYRATE"):
 save_frequency = int(os.getenv("save_frequency", "50"))
 start_seed = int(os.getenv("seed_start", "0"))
 continues = bool(os.getenv("continues", "false"))

 files = glob.glob("data/*.pickle")
 seed_groups = {int(file[9:-7]) for file in files}
 for next_seeds in range(start_seed, 1_000_000, save_frequency):
 if next_seeds in seed_groups:
 continue
 print(f"Start generating routes for seed: {next_seeds}")
 tmp_pickle_str: str = f"data/tmp_{next_seeds:010}.pickle"
 pd.DataFrame().to_pickle(tmp_pickle_str)
 df = pd.DataFrame(
 [
 generate_all_to_series(i, image=False)
 for i in tqdm(range(next_seeds, next_seeds + save_frequency, 1))
 ]
 ).set_index("seed")
 pickle_to_file = f"data/raw_{next_seeds:010}.pickle"
 df.to_pickle(pickle_to_file)
 os.remove(tmp_pickle_str)
 if not continues:
 break

In [None]:
DATA_COLLECTION_PATH: Final[str] = "data/collected.pickle"
if os.path.exists(DATA_COLLECTION_PATH) and not GENERATE_NEW:
 collected_data = pd.read_pickle(DATA_COLLECTION_PATH)
else:
 collected_data = pd.concat(
 [
 pd.read_pickle(filename)
 for filename in tqdm(glob.glob("data/raw_*.pickle")[:NUMBER_OF_FILES_LIMIT])
 ]
 )
 number_of_maps = len(collected_data.index)
 print(f"{number_of_maps: 10} maps collected")
 collected_data.dropna(subset=["route"], inplace=True)
 number_of_routes = len(collected_data.index)
 print(f"{number_of_routes: 10} routes collected")
 collected_data.to_pickle(DATA_COLLECTION_PATH)
collected_data

# find and drop all routes that exit the map!

In [None]:
def check_route_in_bounds(route):

 # easier to debut in multiple lines
 if route is None:
 return False
 if route is pd.NA:
 return False
 if not isinstance(route, np.ndarray):
 return False
 if np.array(
 abs(route) > 100,
 ).any():
 return False
 return True


data_before = len(collected_data.index)

df_filter = collected_data["route"].mapply(check_route_in_bounds)
filtered = collected_data[~df_filter]
collected_data = collected_data[df_filter]

data_after = len(collected_data.index)

print(
 f"{data_before} - {data_before-data_after} = {data_after} sets of data remaining."
)
del data_before, data_after, filtered, df_filter

# find and drop all routes with errors!


In [None]:
def check_route_self_crossing(route):
 if isinstance(route, float):
 print(float)
 return not LineString(route).is_simple


data_before = len(collected_data.index)
collected_data = collected_data[
 ~collected_data["route"].mapply(check_route_self_crossing)
]
data_after = len(collected_data.index)
print(
 f"{data_before} - {data_before-data_after} = {data_after} sets of data remaining."
)
del data_before, data_after

# distribution over costs and points in routes!

In [None]:
QUANTILE_LIMIT: Final[float] = 0.95
if "DATA_UPPER_LIMIT_QUANTIL" not in locals():
 DATA_UPPER_LIMIT_QUANTIL: Final[float] = collected_data["cost"].quantile(
 QUANTILE_LIMIT
 )
 OVER_QUANTILE: Final[int] = int(len(collected_data.index) * (1 - QUANTILE_LIMIT))
print(
 f"{OVER_QUANTILE} entries over the {QUANTILE_LIMIT} quantile at {DATA_UPPER_LIMIT_QUANTIL:.3f}"
)

In [None]:
collected_data["cost"].plot.hist(bins=10, log=False) # find a drop limit
plt.axvline(x=DATA_UPPER_LIMIT_QUANTIL, color="RED", label="95% Quantil")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(15, 25))
for count, (seed, row) in tqdm(
 enumerate(
 collected_data[collected_data["cost"] > DATA_UPPER_LIMIT_QUANTIL]
 .sort_values("cost")
 .iloc[0 :: int(OVER_QUANTILE / 12)]
 .iloc[:12]
 .iterrows()
 ),
 total=12,
):
 plt.subplot(5, 3, count + 1)
 plot_situation(
 destination=Point(row.destination_x, row.destination_y),
 obstacles=row.generated_obstacles,
 obstacle_color="RED",
 route=row.route_generated,
 title=f"Cost: {row.cost}",
 )
plt.show()

In [None]:
collected_data = collected_data.loc[collected_data["cost"] < DATA_UPPER_LIMIT_QUANTIL]
collected_data

In [None]:
collected_data["cost"].plot.hist(log=True)
plt.show()

In [None]:
collected_data[collected_data["cost"] < DATA_UPPER_LIMIT_QUANTIL]

plt.figure(figsize=(17.5, 25))
for count, (seed, row) in enumerate(
 collected_data[collected_data["cost"] < DATA_UPPER_LIMIT_QUANTIL]
 .sort_values("cost")
 .iloc[1:600:51]
 .iterrows()
):
 plt.subplot(4, 3, count + 1)
 plot_situation(
 destination=Point(row.destination_x, row.destination_y),
 obstacles=row.generated_obstacles,
 obstacle_color="RED",
 route=row.route_generated,
 title=f"Cost: {row.cost:.3f}",
 legend=count == 0,
 )
plt.show()
del seed

# Visualize Complexity

In [None]:
def get_route_points(data):
 df = data["route"].apply(lambda r: r.shape[0] - 1)
 df.name = "route complexity"
 return df


route_points = get_route_points(collected_data)

In [None]:
route_points.plot.hist()
plt.show()

In [None]:
routes_before = len(collected_data.index)
collected_data = collected_data[route_points <= 15]
routes_after = len(collected_data.index)
print(
 f"{routes_before} - {routes_before - routes_after} = {routes_after} "
 f"if only routes with less then 15 course changes remain."
)

In [None]:
get_route_points(collected_data).plot.hist(bins=13)
plt.show()

In [None]:
get_route_points(collected_data).value_counts().sort_index()

# Dropping routes that are too easy

In [None]:
LIMIT_SIMPLE_CASES = 0.05
values = get_route_points(collected_data).value_counts().sort_index()
chance_limit = (
 (len(collected_data.index) * LIMIT_SIMPLE_CASES * (1 - LIMIT_SIMPLE_CASES))
 / values.get(1, 1)
 if 1 in values.index
 else 1
)
print(
 f"Limiting simple cases to {LIMIT_SIMPLE_CASES * 100:.1f}% of the total routes. Reducing simple routes to {(chance_limit * 100):.1f}% of their volume."
)

In [None]:
collected_data = collected_data[
 (
 (get_route_points(collected_data) > 1)
 | (np.random.random(len(collected_data.index)) < chance_limit)
 )
]
get_route_points(collected_data).plot.hist(bins=13)
plt.show()

In [None]:
get_route_points(collected_data).value_counts().sort_index()

In [None]:
collected_data

In [None]:
del chance_limit

# Memory consumption

In [None]:
collected_data

In [None]:
def generate_image_maps(row, route_type: Literal["dot", "line"]):
 img = np.expand_dims(
 np.asarray(
 generate_image_from_map(
 obstacles=row.generated_obstacles,
 destination=Point(row.destination_x, row.destination_y),
 route=row.route_generated,
 route_type=route_type,
 seed=row.name,
 )
 ),
 axis=0,
 )
 img = img // 0xFF
 return img


generated = collected_data.head().apply(generate_image_maps, axis=1, args=("dot",))
humanize.naturalsize(generated.memory_usage(deep=True))

In [None]:
if "image" in collected_data.columns:
 del collected_data["image"]

In [None]:
DATA_WITH_IMG_PATH: Final[str] = "data/collected_and_filtered.pickle"
if os.path.exists(DATA_WITH_IMG_PATH) and not GENERATE_NEW:
 collected_data = pd.read_pickle(DATA_WITH_IMG_PATH)
else:
 collected_data.to_pickle(DATA_WITH_IMG_PATH)

In [None]:
image_series = collected_data.progress_apply(
 generate_image_maps, axis=1, args=("line",)
)

# collected_data["image_lines"] = collected_data.apply(
# generate_image_maps, axis=1, args=("line",)
# )

In [None]:
collected_routes = np.concatenate(image_series)
del image_series

In [None]:
humanize.naturalsize(sys.getsizeof(collected_routes))

In [None]:
collected_routes.dtype

In [None]:
memory = sorted(
 [
 (x, sys.getsizeof(globals().get(x)))
 for x in dir()
 if not x.startswith("_") and x not in sys.modules
 ],
 key=lambda x: x[1],
 reverse=True,
)
memory = {name: humanize.naturalsize(mem) for name, mem in memory[:10]}
memory

In [None]:
COLLECTED_ROUTES_DUMP = "data/collected_routes_np.pickle"
with open(COLLECTED_ROUTES_DUMP, "wb") as f:
 pickle.dump(collected_routes, f)

# with open(COLLECTED_ROUTES_DUMP,'rb') as f: collected_routes = pickle.load(f)

[Pix2Pix Tensorflow](https://www.tensorflow.org/tutorials/generative/pix2pix)

In [None]:
# Source: https://www.tensorflow.org/tutorials/generative/pix2pix
def downsample(filters, size, apply_batchnorm=True):
 initializer = tf.random_normal_initializer(mean=0.0, stddev=0.02)

 result = tf.keras.Sequential()
 result.add(
 tf.keras.layers.Conv2D(
 filters,
 size,
 strides=2,
 padding="same",
 kernel_initializer=initializer,
 use_bias=False,
 )
 )

 if apply_batchnorm:
 result.add(tf.keras.layers.BatchNormalization())

 result.add(tf.keras.layers.LeakyReLU())

 return result


downsample(64, 4)

In [None]:
collected_routes[0].shape

In [None]:
tf.expand_dims(collected_routes[0], 0).shape

In [None]:
down_model = downsample(3, 4)
tf.cast(tf.expand_dims(collected_routes[1], 0), "float16", name=None)

down_result = down_model(
 tf.cast(tf.expand_dims(collected_routes[1], 0), "float16", name=None)
)
print(down_result.shape)

In [None]:
# Source: https://www.tensorflow.org/tutorials/generative/pix2pix
def upsample(filters, size, apply_dropout=False):
 initializer = tf.random_normal_initializer(0.0, 0.02)

 result = tf.keras.Sequential()
 result.add(
 tf.keras.layers.Conv2DTranspose(
 filters,
 size,
 strides=2,
 padding="same",
 kernel_initializer=initializer,
 use_bias=False,
 )
 )

 result.add(tf.keras.layers.BatchNormalization())

 if apply_dropout:
 result.add(tf.keras.layers.Dropout(0.5))

 result.add(tf.keras.layers.ReLU())

 return result

In [None]:
up_model = upsample(3, 4)
up_result = up_model(down_result)
up_result.shape

In [None]:
def model_generator():

 inputs = tf.keras.layers.Input(shape=[IMG_SIZE, IMG_SIZE, 2])

 # down_stack = [
 # downsample(64, 4, apply_batchnorm=False), # (batch_size, 64, 64, 128)
 # downsample(128, 4), # (batch_size, 8, 8, 512)
 # downsample(512, 4), # (batch_size, 4, 4, 512)
 # downsample(512, 4), # (batch_size, 2, 2, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # ]
 #
 # up_stack = [
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4), # (batch_size, 16, 16, 1024)
 # upsample(128, 4), # (batch_size, 32, 32, 512)
 # upsample(64, 4), # (batch_size, 64, 64, 256)
 # ]

 down_stack = [
 downsample(64, 4, apply_batchnorm=False), # (batch_size, 64, 64, 128)
 downsample(128, 4), # (batch_size, 8, 8, 512)
 downsample(256, 4), # (batch_size, 4, 4, 512)
 downsample(256, 4), # (batch_size, 2, 2, 512)
 downsample(256, 4), # (batch_size, 1, 1, 512)
 downsample(512, 4), # (batch_size, 1, 1, 512)
 downsample(512, 4), # (batch_size, 1, 1, 512)
 ]

 up_stack = [
 upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4), # (batch_size, 16, 16, 1024)
 upsample(128, 4), # (batch_size, 32, 32, 512)
 upsample(64, 4), # (batch_size, 64, 64, 256)
 ]

 initializer = tf.random_normal_initializer(0.0, 0.02)
 last = tf.keras.layers.Conv2DTranspose(
 1,
 4,
 strides=2,
 padding="same",
 kernel_initializer=initializer,
 activation="tanh",
 ) # (batch_size, 256, 256, 3)

 x = inputs

 # Down sampling through the model
 skips = []
 for down in down_stack:
 x = down(x)
 skips.append(x)

 skips = reversed(skips[:-1])

 # Up sampling and establishing the skip connections
 for up, skip in zip(up_stack, skips):
 x = up(x)
 x = tf.keras.layers.Concatenate()([x, skip])

 x = last(x)

 return tf.keras.Model(inputs=inputs, outputs=x)


generator = model_generator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)

In [None]:
def model_generator():

 inputs = tf.keras.layers.Input(shape=[IMG_SIZE, IMG_SIZE, 2])

 # down_stack = [
 # downsample(64, 4, apply_batchnorm=False), # (batch_size, 64, 64, 128)
 # downsample(128, 4), # (batch_size, 8, 8, 512)
 # downsample(512, 4), # (batch_size, 4, 4, 512)
 # downsample(512, 4), # (batch_size, 2, 2, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # downsample(512, 4), # (batch_size, 1, 1, 512)
 # ]
 #
 # up_stack = [
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 # upsample(512, 4), # (batch_size, 16, 16, 1024)
 # upsample(128, 4), # (batch_size, 32, 32, 512)
 # upsample(64, 4), # (batch_size, 64, 64, 256)
 # ]

 down_stack = [
 downsample(64, 4, apply_batchnorm=False), # (batch_size, 64, 64, 128)
 downsample(128, 4), # (batch_size, 8, 8, 512)
 downsample(256, 4), # (batch_size, 4, 4, 512)
 downsample(256, 4), # (batch_size, 2, 2, 512)
 downsample(256, 4), # (batch_size, 1, 1, 512)
 downsample(512, 4), # (batch_size, 1, 1, 512)
 downsample(512, 4), # (batch_size, 1, 1, 512)
 ]

 up_stack = [
 upsample(512, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4, apply_dropout=True), # (batch_size, 4, 4, 1024)
 upsample(256, 4), # (batch_size, 16, 16, 1024)
 upsample(128, 4), # (batch_size, 32, 32, 512)
 upsample(64, 4), # (batch_size, 64, 64, 256)
 ]

 initializer = tf.random_normal_initializer(0.0, 0.02)
 last = tf.keras.layers.Conv2DTranspose(
 1,
 4,
 strides=2,
 padding="same",
 kernel_initializer=initializer,
 activation="tanh",
 ) # (batch_size, 256, 256, 3)

 x = inputs

 # Down sampling through the model
 skips = []
 for down in down_stack:
 x = down(x)
 skips.append(x)

 skips = reversed(skips[:-1])

 # Up sampling and establishing the skip connections
 for up, skip in zip(up_stack, skips):
 x = up(x)
 x = tf.keras.layers.Concatenate()([x, skip])

 x = last(x)

 return tf.keras.Model(inputs=inputs, outputs=x)


generator = model_generator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)

In [None]:
!pip install pydot

In [None]:
!pip install pydotplus

In [None]:
generator.compile(
 optimizer=tf.keras.optimizers.RMSprop(), # Optimizer
 # Loss function to minimize
 loss="mean_squared_error",
 # tf.keras.losses.SparseCategoricalCrossentropy(),
 # List of metrics to monitor
 metrics=[
 "binary_crossentropy",
 "mean_squared_error",
 "mean_absolute_error",
 ], # root_mean_squared_error
)

In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(
 monitor="mean_squared_error",
 min_delta=0.0005,
 patience=2,
 verbose=0,
 mode="auto",
 restore_best_weights=True,
)

tf_board = tf.keras.callbacks.TensorBoard(
 log_dir="./log_dir",
 histogram_freq=100,
 write_graph=False,
 write_images=False,
 write_steps_per_second=True,
 update_freq="epoch",
 profile_batch=(20, 40),
 embeddings_freq=0,
 embeddings_metadata=None,
)

reduce_learning_rate = tf.keras.callbacks.ReduceLROnPlateau(
 monitor="some metric", factor=0.2, patience=5, min_lr=000.1, verbose=1
)

In [None]:
plt.figure(figsize=(17.5, 25))
np_array = np.flip(collected_routes[1, :, :, :], axis=0)

for chanel in tqdm(range(3)):
 plt.subplot(1, 4, chanel + 1)
 plt.imshow(np_array[:, :, chanel], interpolation="nearest")
plt.subplot(1, 4, 4)
plt.imshow(0x88 * np_array[:, :, 0] + 0xFF * np_array[:, :, 2], interpolation="nearest")
plt.show()

In [None]:
collected_routes[:, :, :, :2].shape

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(
 (collected_routes[:, :, :, :2], collected_routes[:, :, :, 2])
)
# test_dataset = tf.data.Dataset.from_tensor_slices((test_examples, test_labels))

In [None]:
train_dataset

In [None]:
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 100
# train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)

In [None]:
train_dataset = train_dataset.batch(BATCH_SIZE)

In [None]:
history = generator.fit(
 train_dataset,
 epochs=20,
 batch_size=512,
 use_multiprocessing=True,
 workers=5,
 callbacks=[early_stop, tf_board],
 # tqdm_callback,
)

In [None]:
plt.plot(history.history["loss"])

In [None]:
collected_routes[0:1, :, :, :2].shape

In [None]:
predicted = generator.predict(
 collected_routes[:100, :, :, :2],
 batch_size=None,
 verbose="auto",
 steps=None,
 callbacks=None,
 max_queue_size=10,
 workers=3,
 use_multiprocessing=True,
)

In [None]:
predicted.shape

In [None]:
plt.imshow(predicted[1, :, :, 0], interpolation="nearest")
plt.show()

In [None]:
for pos in range(5):
 plt.imshow(
 predicted[pos, :, :, 0] * 0xFF + collected_routes[pos, :, :, 0] * 20,
 interpolation="nearest",
 )
 plt.show()

In [None]:
# tf.keras.utils.plot_model(generator)

@article{article,
author = {Jang, Hoyun and Lee, Inwon and Seo, Hyoungseock},
year = {2017},
month = {09},
pages = {4109-4117},
title = {Effectiveness of CFRP rudder aspect ratio for scale model catamaran racing yacht test},
volume = {31},
journal = {Journal of Mechanical Science and Technology},
doi = {10.1007/s12206-017-0807-8}
}

Ich würde auch zu 1. tendieren, stimme Ihnen aber zu, dass das Thema sehr umfangreich ist. Könnte man sich nicht einen Teilbereich herauspicken? Ich verstehe nicht viel vom Segeln, daher lassen Sie mich kurz zusammenfassen, was Sie vorhaben: - Sie generieren Trainingsdaten mit dem existierenden aber langsamen GD Algorithmus. Ich nehme an, es handelt sich um lokale Routen in einem relativ kleinen Kartenausschnitt. Lässt es die Laufzeit zu, dass Sie eine große Menge an Routen berechnen. - Sie haben dann eine Karte und als Ausgabe eine Liste der Wendepunkte - Warum wollen Sie daraus eine Heatmap berechnen? Diesen Schritt habe ich noch nicht verstanden - Wenn Sie aus einer Karte eine Heatmap trainieren wollen und dafür genügend Beispiele haben, könnnten GANs hilfreich sein: https://arxiv.org/abs/1611.07004 Ich würde Ihnen raten, das Problem möglichst so zu reduzieren, dass es im Rahmen des Moduls noch handhabbar bleibt. Alles Weitere kann man sich auch für spätere Arbeiten aufbewahren. Das 2. Thema ist auch ok. Aber vielleicht nicht ganz so spannend. Ich überlasse Ihnen die Entscheidung. Freundliche Grüße Heiner Giefers