304 lines
9.6 KiB
Python
304 lines
9.6 KiB
Python
import glob
|
|
import os
|
|
from typing import Optional
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import pandas as pd
|
|
from PIL import ImageDraw
|
|
from PIL.Image import Image
|
|
from shapely.geometry import Polygon, Point
|
|
from shapely.ops import unary_union
|
|
from tqdm import tqdm
|
|
|
|
import experiments
|
|
from pyrate.plan.nearplanner.timing_frame import TimingFrame
|
|
|
|
experiments.optimization_param.n_iter_grad = 50
|
|
experiments.optimization_param.verbose = False
|
|
|
|
SIZE_INNER = 75
|
|
SIZE_ROUTE = 100
|
|
MIN_DESTINATION_DISTANCE = 25
|
|
|
|
|
|
# https://stackoverflow.com/questions/16444719/python-numpy-complex-numbers-is-there-a-function-for-polar-to-rectangular-co
|
|
def polar_to_cartesian(
|
|
radii: np.ndarray,
|
|
angles: np.ndarray,
|
|
):
|
|
"""Transforms polar coordinates into cartesian coordinates.
|
|
|
|
Args:
|
|
radii: A array of radii.
|
|
angles: A array of angles.
|
|
|
|
Returns:
|
|
An array of cartesian coordinates.
|
|
"""
|
|
return radii * np.exp(2j * angles * np.pi)
|
|
|
|
|
|
def cartesian_to_polar(
|
|
x: np.ndarray,
|
|
):
|
|
"""Transforms cartesian coordinates into polar coordinates.
|
|
|
|
Args:
|
|
x: A set of complex number to be separated into polar coordinates.
|
|
|
|
Returns:
|
|
An distance array and an angle array.
|
|
"""
|
|
return abs(x), np.angle(x)
|
|
|
|
|
|
def random_polygon(
|
|
radius_mean: float = 2,
|
|
radius_sigma: float = 1.5,
|
|
):
|
|
"""Generates the simplest of polygons, a triangle with a size described by a random polygon.
|
|
|
|
Args:
|
|
radius_mean: The average radius defining a circumcircle of a triangle.
|
|
radius_sigma: The variance of a radius defining a circumcircle of a triangle.
|
|
|
|
Returns:
|
|
A single triangle.
|
|
"""
|
|
number_of_corners = np.random.randint(3, 10)
|
|
array = polar_to_cartesian(
|
|
np.random.lognormal(radius_mean, radius_sigma),
|
|
np.sort(np.random.rand(number_of_corners)),
|
|
)
|
|
offset = np.random.randint(low=-SIZE_ROUTE, high=SIZE_ROUTE, size=(2,))
|
|
return_values = np.zeros((number_of_corners, 2), dtype=float)
|
|
return_values[:] = offset
|
|
return_values[:, :] += np.array((np.real(array), np.imag(array))).T
|
|
return Polygon(return_values)
|
|
|
|
|
|
def generate_obstacles(
|
|
seed=None,
|
|
number_of_polygons: int = 40,
|
|
radius_mean: float = 2,
|
|
radius_sigma: float = 1,
|
|
) -> dict[str, Polygon]:
|
|
"""Generates a set of obstacles from a union of triangles.
|
|
|
|
The union of triangles meas that if polygons overlap o polygon containing the union of those polygons is returned.
|
|
Args:
|
|
seed: A seed to generate a set of obstacles from.
|
|
number_of_polygons: The number of polygons that should be drawn.
|
|
radius_mean: The average radius defining a circumcircle of an obstacle triangle.
|
|
radius_sigma: The variance of a radius defining a circumcircle of an obstacle triangle.
|
|
|
|
Returns:
|
|
A list of unified obstacles.
|
|
"""
|
|
if seed is not None:
|
|
np.random.seed(seed)
|
|
polygons = []
|
|
for _ in range(number_of_polygons):
|
|
poly = random_polygon(radius_mean, radius_sigma)
|
|
if poly.contains(Point(0, 0)):
|
|
continue
|
|
if poly.exterior.distance(Point(0, 0)) < 1:
|
|
continue
|
|
polygons.append(poly)
|
|
polygon_list = list(unary_union(polygons).geoms)
|
|
return {str(i): p for i, p in enumerate(polygon_list)}
|
|
|
|
|
|
def generate_destination(obstacles: dict[str, Polygon], seed: Optional[int] = None) -> Point:
|
|
"""Generates for a map.
|
|
|
|
Can be used to generate a valid destination for list of obstacles.
|
|
Args:
|
|
obstacles: A list of obstacles.
|
|
seed: The seed determining the point.
|
|
|
|
Returns:
|
|
A goal that should be reached by the ship.
|
|
"""
|
|
# sets the seed
|
|
if seed is not None:
|
|
np.random.seed(seed)
|
|
|
|
# generates the point
|
|
point: Optional[Point] = None
|
|
while (
|
|
point is None
|
|
or abs(point.x) < MIN_DESTINATION_DISTANCE
|
|
or abs(point.y) < MIN_DESTINATION_DISTANCE
|
|
or any(obstacle.contains(point) for obstacle in obstacles.values())
|
|
):
|
|
point = Point(np.random.randint(-SIZE_INNER, SIZE_INNER, size=(2,), dtype=int))
|
|
return point
|
|
|
|
|
|
def plot_situation(
|
|
obstacles: dict[str, Polygon],
|
|
destination: Point,
|
|
obstacle_color: Optional[str] = None,
|
|
route: Optional[TimingFrame] = None,
|
|
legend: bool = True,
|
|
title: Optional[str] = None,
|
|
) -> None:
|
|
"""PLots the obstacles into a matplotlib plot.
|
|
|
|
Args:
|
|
obstacles: A list of obstacles.
|
|
destination: The destination that should be reached by the boat.
|
|
obstacle_color: The color the obstacles should have. Can be None.
|
|
If none all obstacles will have different colors.
|
|
route: The route that should be plotted.
|
|
legend: If true plots a legend.
|
|
title: The title of the plot.
|
|
Returns:
|
|
None
|
|
"""
|
|
plt.figure(figsize=(8, 8))
|
|
plt.axis([-SIZE_ROUTE, SIZE_ROUTE, -SIZE_ROUTE, SIZE_ROUTE])
|
|
|
|
# Sets a title if one is demanded
|
|
if title:
|
|
plt.title(title)
|
|
|
|
# Plots the obsticles.
|
|
if obstacles:
|
|
for polygon in obstacles.values():
|
|
if obstacle_color is not None:
|
|
plt.fill(*polygon.exterior.xy, color=obstacle_color, label="Obstacle")
|
|
else:
|
|
plt.fill(*polygon.exterior.xy)
|
|
|
|
# Plots the wind direction
|
|
# https://www.geeksforgeeks.org/matplotlib-pyplot-arrow-in-python/
|
|
plt.arrow(
|
|
0,
|
|
+int(SIZE_ROUTE * 0.9),
|
|
0,
|
|
-int(SIZE_ROUTE * 0.1),
|
|
head_width=10,
|
|
width=4,
|
|
label="Wind (3Bft)",
|
|
)
|
|
|
|
if route:
|
|
plt.plot(route.points[:, 0], route.points[:, 1], color="BLUE", marker=".")
|
|
|
|
# Plots the estination
|
|
if destination:
|
|
plt.scatter(*destination.xy, marker="X", color="green", label="Destination")
|
|
plt.scatter(0, 0, marker="o", color="green", label="Start")
|
|
|
|
if legend:
|
|
# https://stackoverflow.com/questions/13588920/stop-matplotlib-repeating-labels-in-legend
|
|
handles, labels = plt.gca().get_legend_handles_labels()
|
|
by_label = dict(zip(labels, handles))
|
|
plt.legend(by_label.values(), by_label.keys())
|
|
plt.show()
|
|
|
|
|
|
def generate_image_from_map(
|
|
obstacles: dict[str, Polygon],
|
|
destination: Point,
|
|
route: Optional[list[TimingFrame]],
|
|
) -> Image:
|
|
"""Generate an image from the map.
|
|
|
|
Can be used to feed an ANN.
|
|
- Obstacles are marked as reed.
|
|
- The destination is marked as green.
|
|
- The points where the route will likely change are blue.
|
|
|
|
Args:
|
|
obstacles: A dict of obstacles as shapely Polygons. Keyed as a string.
|
|
destination: A destination that should be navigated to.
|
|
"""
|
|
img = Image.new(
|
|
"RGB",
|
|
(SIZE_ROUTE * 2, SIZE_ROUTE * 2),
|
|
"#ffffff",
|
|
)
|
|
draw = ImageDraw.Draw(img)
|
|
for polygon in obstacles.values():
|
|
draw.polygon(
|
|
list(np.dstack(polygon.exterior.xy).reshape((-1)) + SIZE_ROUTE),
|
|
fill="#FF0000",
|
|
outline="#FF0000",
|
|
)
|
|
img.putpixel((int(destination.x) + 100, int(destination.y) + 100), (0, 0xFF, 0))
|
|
return img
|
|
|
|
|
|
def generate_all_to_series(seed: Optional[int] = None, image: bool = False) -> pd.Series:
|
|
"""Generates everything and aggregates all data into a `pd:Series`.
|
|
|
|
Args:
|
|
seed:The seed that should be used to generate map and destination.
|
|
image: If an image should be generated or if that should be postponed to save memory.
|
|
Returns:
|
|
Contains a `pd.Series`containing the following.
|
|
- The seed tha generated the map.
|
|
- The destination in x
|
|
- The destination in y
|
|
- A list of Obstacle polygons.
|
|
- The route generated for this map by the roBOOTer navigation system.
|
|
- Optionally the image containing all the information.
|
|
Can be generated at a later date without the fear for a loss of accuracy.
|
|
"""
|
|
obstacles = generate_obstacles(seed)
|
|
destination = generate_destination(obstacles, seed)
|
|
|
|
try:
|
|
route, _ = experiments.generate_route(
|
|
position=Point(0, 0), goal=destination, obstacles=obstacles, wind=(18, 180)
|
|
)
|
|
except Exception as e:
|
|
print("Error")
|
|
print(e)
|
|
route = None
|
|
return pd.Series(
|
|
data={
|
|
"seed": str(seed),
|
|
"obstacles": obstacles,
|
|
"destination_x": destination.x,
|
|
"destination_y": destination.y,
|
|
"image": generate_image_from_map(obstacles, destination, route) if image else pd.NA,
|
|
"route": route.points if route else pd.NA,
|
|
"cost": route.cost if route else pd.NA,
|
|
},
|
|
name=str(seed),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
save_frequency = int(os.getenv("save_frequency", "50"))
|
|
start_seed = int(os.getenv("seed_start", "0"))
|
|
continues = os.getenv("continues", "true").lower() == "true"
|
|
print(f"Save Frequency: {save_frequency}")
|
|
print(f"Start seed: {start_seed}")
|
|
print(f"Continues: {continues}")
|
|
|
|
files = glob.glob("data/raw_*.pickle") + glob.glob("data/tmp_*.pickle")
|
|
seed_groups = {int(file[9:-7]) for file in files}
|
|
for next_seeds in range(start_seed, 10_000_000_000, save_frequency):
|
|
if next_seeds in seed_groups:
|
|
continue
|
|
print(f"Start generating routes for seed: {next_seeds}")
|
|
tmp_pickle_str: str = f"data/tmp_{next_seeds:010}.pickle"
|
|
pd.DataFrame().to_pickle(tmp_pickle_str)
|
|
df = pd.DataFrame(
|
|
[
|
|
generate_all_to_series(i, image=False)
|
|
for i in tqdm(range(next_seeds, next_seeds + save_frequency, 1))
|
|
]
|
|
).set_index("seed")
|
|
pickle_to_file = f"data/raw_{next_seeds:010}.pickle"
|
|
df.to_pickle(pickle_to_file)
|
|
os.remove(tmp_pickle_str)
|
|
if not continues:
|
|
break
|