import glob import os from typing import Optional import matplotlib.pyplot as plt import numpy as np import pandas as pd from PIL import ImageDraw from PIL.Image import Image from shapely.geometry import Polygon, Point from shapely.ops import unary_union from tqdm import tqdm import experiments from pyrate.plan.nearplanner.timing_frame import TimingFrame experiments.optimization_param.n_iter_grad = 50 experiments.optimization_param.verbose = False SIZE_INNER = 75 SIZE_ROUTE = 100 MIN_DESTINATION_DISTANCE = 25 # https://stackoverflow.com/questions/16444719/python-numpy-complex-numbers-is-there-a-function-for-polar-to-rectangular-co def polar_to_cartesian( radii: np.ndarray, angles: np.ndarray, ): """Transforms polar coordinates into cartesian coordinates. Args: radii: A array of radii. angles: A array of angles. Returns: An array of cartesian coordinates. """ return radii * np.exp(2j * angles * np.pi) def cartesian_to_polar( x: np.ndarray, ): """Transforms cartesian coordinates into polar coordinates. Args: x: A set of complex number to be separated into polar coordinates. Returns: An distance array and an angle array. """ return abs(x), np.angle(x) def random_polygon( radius_mean: float = 2, radius_sigma: float = 1.5, ): """Generates the simplest of polygons, a triangle with a size described by a random polygon. Args: radius_mean: The average radius defining a circumcircle of a triangle. radius_sigma: The variance of a radius defining a circumcircle of a triangle. Returns: A single triangle. """ number_of_corners = np.random.randint(3, 10) array = polar_to_cartesian( np.random.lognormal(radius_mean, radius_sigma), np.sort(np.random.rand(number_of_corners)), ) offset = np.random.randint(low=-SIZE_ROUTE, high=SIZE_ROUTE, size=(2,)) return_values = np.zeros((number_of_corners, 2), dtype=float) return_values[:] = offset return_values[:, :] += np.array((np.real(array), np.imag(array))).T return Polygon(return_values) def generate_obstacles( seed=None, number_of_polygons: int = 40, radius_mean: float = 2, radius_sigma: float = 1, ) -> dict[str, Polygon]: """Generates a set of obstacles from a union of triangles. The union of triangles meas that if polygons overlap o polygon containing the union of those polygons is returned. Args: seed: A seed to generate a set of obstacles from. number_of_polygons: The number of polygons that should be drawn. radius_mean: The average radius defining a circumcircle of an obstacle triangle. radius_sigma: The variance of a radius defining a circumcircle of an obstacle triangle. Returns: A list of unified obstacles. """ if seed is not None: np.random.seed(seed) polygons = [] for _ in range(number_of_polygons): poly = random_polygon(radius_mean, radius_sigma) if poly.contains(Point(0, 0)): continue if poly.exterior.distance(Point(0, 0)) < 1: continue polygons.append(poly) polygon_list = list(unary_union(polygons).geoms) return {str(i): p for i, p in enumerate(polygon_list)} def generate_destination(obstacles: dict[str, Polygon], seed: Optional[int] = None) -> Point: """Generates for a map. Can be used to generate a valid destination for list of obstacles. Args: obstacles: A list of obstacles. seed: The seed determining the point. Returns: A goal that should be reached by the ship. """ # sets the seed if seed is not None: np.random.seed(seed) # generates the point point: Optional[Point] = None while ( point is None or abs(point.x) < MIN_DESTINATION_DISTANCE or abs(point.y) < MIN_DESTINATION_DISTANCE or any(obstacle.contains(point) for obstacle in obstacles.values()) ): point = Point(np.random.randint(-SIZE_INNER, SIZE_INNER, size=(2,), dtype=int)) return point def plot_situation( obstacles: dict[str, Polygon], destination: Point, obstacle_color: Optional[str] = None, route: Optional[TimingFrame] = None, legend: bool = True, title: Optional[str] = None, ) -> None: """PLots the obstacles into a matplotlib plot. Args: obstacles: A list of obstacles. destination: The destination that should be reached by the boat. obstacle_color: The color the obstacles should have. Can be None. If none all obstacles will have different colors. route: The route that should be plotted. legend: If true plots a legend. title: The title of the plot. Returns: None """ plt.figure(figsize=(8, 8)) plt.axis([-SIZE_ROUTE, SIZE_ROUTE, -SIZE_ROUTE, SIZE_ROUTE]) # Sets a title if one is demanded if title: plt.title(title) # Plots the obsticles. if obstacles: for polygon in obstacles.values(): if obstacle_color is not None: plt.fill(*polygon.exterior.xy, color=obstacle_color, label="Obstacle") else: plt.fill(*polygon.exterior.xy) # Plots the wind direction # https://www.geeksforgeeks.org/matplotlib-pyplot-arrow-in-python/ plt.arrow( 0, +int(SIZE_ROUTE * 0.9), 0, -int(SIZE_ROUTE * 0.1), head_width=10, width=4, label="Wind (3Bft)", ) if route: plt.plot(route.points[:, 0], route.points[:, 1], color="BLUE", marker=".") # Plots the estination if destination: plt.scatter(*destination.xy, marker="X", color="green", label="Destination") plt.scatter(0, 0, marker="o", color="green", label="Start") if legend: # https://stackoverflow.com/questions/13588920/stop-matplotlib-repeating-labels-in-legend handles, labels = plt.gca().get_legend_handles_labels() by_label = dict(zip(labels, handles)) plt.legend(by_label.values(), by_label.keys()) plt.show() def generate_image_from_map( obstacles: dict[str, Polygon], destination: Point, route: Optional[list[TimingFrame]], ) -> Image: """Generate an image from the map. Can be used to feed an ANN. - Obstacles are marked as reed. - The destination is marked as green. - The points where the route will likely change are blue. Args: obstacles: A dict of obstacles as shapely Polygons. Keyed as a string. destination: A destination that should be navigated to. """ img = Image.new( "RGB", (SIZE_ROUTE * 2, SIZE_ROUTE * 2), "#ffffff", ) draw = ImageDraw.Draw(img) for polygon in obstacles.values(): draw.polygon( list(np.dstack(polygon.exterior.xy).reshape((-1)) + SIZE_ROUTE), fill="#FF0000", outline="#FF0000", ) img.putpixel((int(destination.x) + 100, int(destination.y) + 100), (0, 0xFF, 0)) return img def generate_all_to_series(seed: Optional[int] = None, image: bool = False) -> pd.Series: """Generates everything and aggregates all data into a `pd:Series`. Args: seed:The seed that should be used to generate map and destination. image: If an image should be generated or if that should be postponed to save memory. Returns: Contains a `pd.Series`containing the following. - The seed tha generated the map. - The destination in x - The destination in y - A list of Obstacle polygons. - The route generated for this map by the roBOOTer navigation system. - Optionally the image containing all the information. Can be generated at a later date without the fear for a loss of accuracy. """ obstacles = generate_obstacles(seed) destination = generate_destination(obstacles, seed) try: route, _ = experiments.generate_route( position=Point(0, 0), goal=destination, obstacles=obstacles, wind=(18, 180) ) except Exception as e: print("Error") print(e) route = None return pd.Series( data={ "seed": str(seed), "obstacles": obstacles, "destination_x": destination.x, "destination_y": destination.y, "image": generate_image_from_map(obstacles, destination, route) if image else pd.NA, "route": route.points if route else pd.NA, "cost": route.cost if route else pd.NA, }, name=str(seed), ) if __name__ == "__main__": save_frequency = int(os.getenv("save_frequency", "50")) start_seed = int(os.getenv("seed_start", "0")) continues = os.getenv("continues", "true").lower() == "true" print(f"Save Frequency: {save_frequency}") print(f"Start seed: {start_seed}") print(f"Continues: {continues}") files = glob.glob("data/raw_*.pickle") + glob.glob("data/tmp_*.pickle") seed_groups = {int(file[9:-7]) for file in files} for next_seeds in range(start_seed, 10_000_000_000, save_frequency): if next_seeds in seed_groups: continue print(f"Start generating routes for seed: {next_seeds}") tmp_pickle_str: str = f"data/tmp_{next_seeds:010}.pickle" pd.DataFrame().to_pickle(tmp_pickle_str) df = pd.DataFrame( [ generate_all_to_series(i, image=False) for i in tqdm(range(next_seeds, next_seeds + save_frequency, 1)) ] ).set_index("seed") pickle_to_file = f"data/raw_{next_seeds:010}.pickle" df.to_pickle(pickle_to_file) os.remove(tmp_pickle_str) if not continues: break