738 lines
28 KiB
Python
738 lines
28 KiB
Python
"""
|
|
optimizer code for use in the near-planner ros-node
|
|
"""
|
|
|
|
# Dataclass Support
|
|
from dataclasses import dataclass
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Tuple
|
|
from typing import Union
|
|
|
|
# Static Typing
|
|
from typing import cast
|
|
|
|
# Numerical Computing
|
|
import numpy as np
|
|
import shapely
|
|
import shapely.ops
|
|
from numpy import typing as npt
|
|
|
|
# Geometry
|
|
from pyrate.plan.geometry.route import CartesianRoute
|
|
from scipy import linalg
|
|
from scipy.special import softmax
|
|
from shapely.geometry import Point
|
|
|
|
from . import utils
|
|
from .cost_model import CostModel
|
|
from .evaluated_timing_frame import EvaluatedTimingFrame
|
|
from .exceptions import NoRouteFoundException
|
|
from .holders import EstimationParameters
|
|
from .holders import OptimizationParameters
|
|
from .obstacle import Obstacle
|
|
from .polar_model import PolarModel
|
|
from .timing_frame import TimingFrame
|
|
|
|
|
|
class Optimizer:
|
|
"""Debug class for interactive interpreter.
|
|
|
|
Object for initializing and optimizing (near) routes / planning based on obstacles and polar diagrams.
|
|
Provides optimization regarding sailing time and obstacle danger, which is evaluated
|
|
via cost functions.
|
|
|
|
Args:
|
|
wind_information: optional current tuple of ``(angle, speed)``, in (rad, strength in m/s)
|
|
obstacles: optional dict mapping obstacle keys to obstacle objects
|
|
heading: optional current heading angle of the boat, in radians, right hand from true north
|
|
position: optional current position of the sailboat, in ``(east, north)``/``(x, y)`` coordinates
|
|
"""
|
|
|
|
def __init__( # pylint: disable=R0913
|
|
self,
|
|
position: Optional[Point] = None,
|
|
obstacles: Optional[Dict[str, Obstacle]] = None,
|
|
heading: Optional[npt.NDArray[np.floating]] = None,
|
|
wind_information: Optional[Tuple[float, float]] = None,
|
|
) -> None:
|
|
# environment variables
|
|
wind_angle, wind_speed = wind_information if wind_information is not None else (0, 0)
|
|
|
|
self._wind_angle: np.floating = np.float64(wind_angle)
|
|
self._wind_speed: np.floating = np.float64(wind_speed)
|
|
|
|
self._grad_lim: np.floating = np.float32(5.0)
|
|
|
|
# boat model and goal
|
|
self.boat_polar: PolarModel = PolarModel(wind_speed, wind_angle)
|
|
|
|
# obstacles
|
|
self._obstacles: Dict[str, Obstacle] = obstacles or {}
|
|
|
|
self.cost_model: CostModel = CostModel(self._obstacles, self.boat_polar)
|
|
|
|
# boat position
|
|
self._heading: Optional[npt.NDArray[np.floating]] = heading
|
|
|
|
self.heading_dir: Optional[npt.NDArray[np.floating]] = np.array([1, 0])
|
|
if heading is not None:
|
|
self.heading_dir = np.array([np.cos(heading), np.sin(heading)])
|
|
|
|
self._position: Point = position if position is not None else Point(0, 0)
|
|
|
|
self._goal: Optional[Point] = None
|
|
|
|
def __str__(self):
|
|
return f"Optimizer(wind=({self._wind_angle},{self._wind_speed}), position={self._position}, goal={self._goal}, obs={self._obstacles.items()})"
|
|
|
|
# ---- UPDATE INTERFACES ---- #
|
|
@property
|
|
def position(self) -> Point:
|
|
"""Position of the boat held by the optimizer"""
|
|
return self._position
|
|
|
|
@position.setter
|
|
def position(self, value: Point) -> None:
|
|
self._position = value
|
|
|
|
@property
|
|
def goal(self) -> Optional[Point]:
|
|
"""Current sub goal we have to create a route to"""
|
|
return self._goal
|
|
|
|
@goal.setter
|
|
def goal(self, value: Point) -> None:
|
|
self._goal = value
|
|
|
|
@property
|
|
def heading(self) -> Optional[npt.NDArray[np.floating]]:
|
|
"""Current heading of the optimizer"""
|
|
return self._heading
|
|
|
|
@heading.setter
|
|
def heading(self, value: Point) -> None:
|
|
self._heading = value
|
|
self.heading_dir = np.array([np.cos(value), np.sin(value)])
|
|
|
|
def on_added_obstacles(self, obstacles: Dict[str, Obstacle]) -> None:
|
|
"""Event to call if obstacles added to simulation.
|
|
|
|
This event changes the underlying :class:`CostModel` to adapt to deleted obstacles.
|
|
|
|
Args:
|
|
obstacles: obstacles, optionally keyed by ids, to add
|
|
"""
|
|
self.cost_model.add_obstacle(obstacles)
|
|
|
|
def on_reset_obstacles(self, obstacles: Dict[str, Obstacle]) -> None:
|
|
"""Convenience Event to call if a drastic change in obstacle population has happened to simulation.
|
|
|
|
The same effects of this method can be achieved by separately calling :meth:`on_added_obstacles` and
|
|
:meth:`on_deleted_obstacles`. As the behaviours of these methods overlap this method provides a more
|
|
efficient, joint and easier way to deal with a reset or large movement in the stored obstacle
|
|
population.
|
|
|
|
Args:
|
|
obstacles: dictionary of obstacles to reset the stored obstacle population upon, keyed by
|
|
obstacle ids
|
|
"""
|
|
self.cost_model.rebase_obstacles(obstacles)
|
|
|
|
def on_deleted_obstacles(self, obstacle_id_s: Union[str, List[str]]) -> Optional[List[Obstacle]]:
|
|
"""Event to call to delete obstacles from simulation.
|
|
|
|
This event changes the underlying :class:`CostModel` to adapt to deleted obstacles.
|
|
|
|
Args:
|
|
obstacle_id_s: obstacle, optionally keyed by ids, by ids to delete
|
|
"""
|
|
deleted_obs = self.cost_model.delete_obstacle(obstacle_id_s)
|
|
return deleted_obs
|
|
|
|
@property
|
|
def wind_angle(self) -> float:
|
|
"""Wind angle to simulate"""
|
|
return float(self._wind_angle)
|
|
|
|
@wind_angle.setter
|
|
def wind_angle(self, value: float) -> None:
|
|
self._wind_angle = np.float64(value)
|
|
self.boat_polar.wind_direction = np.float64(value)
|
|
|
|
@property
|
|
def wind_speed(self) -> float:
|
|
"""Wind speed to simulate"""
|
|
return float(self._wind_speed)
|
|
|
|
@wind_speed.setter
|
|
def wind_speed(self, value: float) -> None:
|
|
self._wind_speed = np.float64(value)
|
|
self.boat_polar.wind_speed = np.float64(value)
|
|
|
|
# ---- OPTIMIZE LOGIC ---- #
|
|
|
|
def optimize( # noqa: C901
|
|
self,
|
|
goal: Point,
|
|
optimization_parameters: OptimizationParameters,
|
|
heading: Optional[npt.NDArray[np.floating]] = None,
|
|
custom: bool = True,
|
|
) -> Tuple[Optional[EvaluatedTimingFrame], Optional[List[EvaluatedTimingFrame]]]:
|
|
"""Main optimize method of :class:`Optimizer`.
|
|
|
|
It uses different methods to determine an optimized
|
|
:class:`~.evaluated_timing_frame.EvaluatedTimingFrame`
|
|
from a given heading, goal and optimization parameters.
|
|
|
|
Args:
|
|
goal: :class:`shapely.geometry.Point` specifying the route goal in local cartesian coordinates
|
|
optimization_parameters: :class`~dataclasses.dataclass` holding parameters for optimization
|
|
heading: :class:`np.ndarray` 2d vector describing the current heading of the boat
|
|
custom: to use custom method (:meth:`optimize_gradient`)
|
|
|
|
Returns:
|
|
optimized :class:`~.evaluated_timing_frame.EvaluatedTimingFrame` and an optional list of
|
|
intermediate results produced during the optimization and selection algorithm
|
|
|
|
Raises:
|
|
NoRouteFoundException: if no route was found during route discovery
|
|
"""
|
|
# pylint: disable-msg=too-many-locals
|
|
|
|
if goal is not None:
|
|
self._goal = goal
|
|
if heading is not None:
|
|
self._heading = heading
|
|
|
|
assert self._goal is not None
|
|
assert self.wind_angle is not None
|
|
assert self.wind_speed is not None
|
|
assert self.position is not None
|
|
|
|
# prepare initial guess
|
|
init_frames: List[EvaluatedTimingFrame] = []
|
|
final_frames: List[EvaluatedTimingFrame] = []
|
|
costs = []
|
|
construction_successful = False
|
|
|
|
if optimization_parameters.overwrite_grad_lim:
|
|
self.grad_lim = optimization_parameters.overwrite_grad_lim
|
|
# todo (BEN)
|
|
|
|
if optimization_parameters.estimation_parameters.first_try_construction:
|
|
constructed_routes = self._construct_init(
|
|
goal=self._goal, parameters=optimization_parameters.estimation_parameters
|
|
)
|
|
|
|
# remove cycles
|
|
constructed_routes = [f.remove_single_cycles() for f in constructed_routes]
|
|
|
|
# simulate
|
|
_ = [f.update_times(boat_model=self.boat_polar) for f in constructed_routes]
|
|
# evaluate
|
|
init_frames = [self.cost_model.evaluate(f)[0] for f in constructed_routes]
|
|
|
|
# throw away invalid
|
|
init_frames = list(filter(lambda frame: frame.valid, init_frames))
|
|
|
|
construction_successful = len(init_frames) != 0
|
|
|
|
if optimization_parameters.verbose:
|
|
print(f"[C] Construction finished with {len(init_frames)} remaining")
|
|
|
|
if not construction_successful:
|
|
for _ in range(optimization_parameters.n_samples):
|
|
frame = self._sample_tree_init(
|
|
self._goal, parameters=optimization_parameters.estimation_parameters
|
|
)
|
|
if frame is None:
|
|
continue
|
|
|
|
frame = frame.remove_single_cycles()
|
|
frame.update_times(boat_model=self.boat_polar)
|
|
evaluated, _ = self.cost_model.evaluate(frame)
|
|
|
|
init_frames.append(evaluated)
|
|
if len(init_frames) >= optimization_parameters.n_break:
|
|
break
|
|
|
|
assert all(f.valid for f in init_frames)
|
|
|
|
if len(init_frames) == 0:
|
|
raise NoRouteFoundException("route discovery failed")
|
|
|
|
# optimize
|
|
log_grad = []
|
|
|
|
for frame in init_frames:
|
|
if custom:
|
|
optimized_frame, list_of_steps = self.optimize_gradient(
|
|
frame,
|
|
step_size_parameter=optimization_parameters.inital_stepsize
|
|
* linalg.norm(self._goal - self.position),
|
|
optimization_parameters=optimization_parameters, # TODO (BEN) here was something missing
|
|
)
|
|
log_grad.append(list_of_steps)
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
optimized_frame = optimized_frame.remove_single_cycles() # todo execute sometimes in iteration?
|
|
optimized_frame.update_times(boat_model=self.boat_polar)
|
|
optimized_frame_evaluated, _ = self.cost_model.evaluate(optimized_frame)
|
|
costs.append(optimized_frame_evaluated.actual_cost)
|
|
final_frames.append(optimized_frame_evaluated)
|
|
|
|
if optimization_parameters.prune:
|
|
for frame in final_frames:
|
|
frame.prune(eps=0.5 / 180 * np.pi)
|
|
frame.update_times(boat_model=self.boat_polar)
|
|
frame, _ = self.cost_model.evaluate(frame)
|
|
|
|
# handle optimization and initialization failure
|
|
if np.amin(costs) > 1e10:
|
|
return None, None
|
|
|
|
# return TimingFrame with minimal cost and the attempts needed to reach it
|
|
return final_frames[np.argmin(costs)], log_grad[np.argmin(costs)]
|
|
|
|
def optimize_gradient( # noqa: C901
|
|
self,
|
|
init_frame: EvaluatedTimingFrame,
|
|
optimization_parameters: OptimizationParameters,
|
|
step_size_parameter: float = 10.0,
|
|
only_attempt: bool = False,
|
|
) -> Tuple[EvaluatedTimingFrame, List[EvaluatedTimingFrame]]:
|
|
"""Optimization routine for gradient based optimization
|
|
|
|
Args:
|
|
only_attempt: if to assure computed route is valid
|
|
init_frame: initial frame to optimized
|
|
step_size_parameter: step size for gradient descend
|
|
optimization_parameters: parameters for optimization bundled in a dataclass
|
|
|
|
Returns:
|
|
optimized :class:`~.timing_frame.TimingFrame`
|
|
"""
|
|
# pylint: disable-msg=too-many-locals
|
|
# pylint: disable-msg=too-many-statements
|
|
step_size: npt.NDArray[np.floating] = step_size_parameter * np.ones(
|
|
(init_frame.points.shape[0] - 2, 2)
|
|
)
|
|
|
|
last_grad: Optional[npt.NDArray[np.floating]] = None
|
|
last_subgoals: Optional[npt.NDArray[np.floating]] = None
|
|
|
|
cost_list = []
|
|
result_list = [init_frame]
|
|
frame = init_frame
|
|
frame.update_times(self.boat_polar)
|
|
|
|
assert init_frame.valid or only_attempt
|
|
|
|
last_cost: Optional[np.floating] = None
|
|
for i in range(optimization_parameters.n_iter_grad):
|
|
cost, grad = cast(Tuple[np.floating, npt.NDArray[np.floating]], self.cost_model.gradients(frame))
|
|
if optimization_parameters.verbose:
|
|
print(
|
|
f" [C] cost {cost} at stepsize {step_size} with gradient <{grad}> \n subgoals {last_subgoals}"
|
|
)
|
|
if self._heading is not None:
|
|
assert self._heading is not None
|
|
# # assert False, grad
|
|
grad[0, :] = self._heading * np.clip(
|
|
np.sum(self._heading * grad[0, :]),
|
|
-10,
|
|
min(10.0, linalg.norm((frame.points[1, :] - self.position) / step_size[0, :])),
|
|
)
|
|
frame.update_times(self.boat_polar)
|
|
|
|
frame_evaluated, _ = self.cost_model.evaluate(frame)
|
|
# step size adaption:
|
|
if optimization_parameters.adaptive_step_size:
|
|
|
|
if last_cost is not None:
|
|
assert last_cost is not None
|
|
if cost - last_cost > 1e-6 or not frame_evaluated.valid:
|
|
result_list.append(frame)
|
|
cost_list.append(cost)
|
|
step_size /= optimization_parameters.adaptive_data_rate_general_regulization_factor
|
|
assert last_grad is not None
|
|
step_size[
|
|
grad * last_grad < 0
|
|
] /= optimization_parameters.adaptive_data_rate_penalty_on_loss_sign_change
|
|
assert last_subgoals is not None and last_grad is not None
|
|
new_subs = cast(
|
|
npt.NDArray[np.floating],
|
|
last_subgoals
|
|
- np.concatenate(
|
|
(
|
|
np.zeros((1, 2)),
|
|
step_size * np.clip(last_grad, -self._grad_lim, self._grad_lim),
|
|
np.zeros((1, 2)),
|
|
),
|
|
axis=0,
|
|
),
|
|
)
|
|
frame = TimingFrame( # type:ignore
|
|
CartesianRoute.from_numpy(new_subs)
|
|
)
|
|
frame.update_times(self.boat_polar)
|
|
# print(f" ->{last_cost}: {last_grad}")
|
|
continue
|
|
|
|
assert last_grad is not None
|
|
|
|
step_size[
|
|
grad * last_grad < 0
|
|
] /= optimization_parameters.adaptive_data_rate_general_regulization_factor
|
|
step_size[
|
|
grad * last_grad > 0
|
|
] *= optimization_parameters.adaptive_data_rate_general_regulization_factor
|
|
|
|
if (step_size < 1e-6).all():
|
|
break
|
|
last_cost = cost
|
|
cost_list.append(cost)
|
|
last_subgoals = frame.points
|
|
result_list.append(frame)
|
|
last_grad = grad
|
|
# print(f" ->last state committed {last_cost}: {last_grad} : {last_subgoals}")
|
|
# print(f"apply grad<{grad}> {step_size * np.clip(grad, -self._grad_lim, self._grad_lim)}")
|
|
new_subs = frame.points - np.concatenate(
|
|
(
|
|
np.zeros((1, 2)),
|
|
step_size * np.clip(grad, -self._grad_lim, self._grad_lim),
|
|
np.zeros((1, 2)),
|
|
),
|
|
axis=0,
|
|
)
|
|
|
|
assert not (np.isnan(new_subs).any()) or only_attempt, grad
|
|
|
|
frame = TimingFrame(CartesianRoute.from_numpy(new_subs)) # type: ignore
|
|
frame.update_times(self.boat_polar)
|
|
|
|
frame_evaluated, actual_cost = self.cost_model.evaluate(frame)
|
|
cost = np.float64(actual_cost)
|
|
|
|
if last_cost is not None and cost - last_cost > 0 or not frame_evaluated.valid: # got worse
|
|
assert last_subgoals is not None
|
|
frame = TimingFrame(CartesianRoute.from_numpy(last_subgoals)) # type: ignore
|
|
frame.update_times(self.boat_polar)
|
|
frame_evaluated, cost = self.cost_model.evaluate(frame) # evaluate the frame
|
|
if not only_attempt:
|
|
assert frame_evaluated.valid
|
|
cost = np.float32(cost)
|
|
|
|
frame_evaluated.actual_cost = cost
|
|
return frame_evaluated, result_list
|
|
|
|
# --- SOLUTION SPACE EXPLORATION --- #
|
|
|
|
@dataclass
|
|
class _RRTNode:
|
|
"""Class describing solution node for initial guess."""
|
|
|
|
position: Point
|
|
time: float
|
|
distance: float
|
|
angle: Optional[float]
|
|
points: List
|
|
node_identifier: int
|
|
|
|
def _sample_tree_init(
|
|
self, goal: Point, parameters: EstimationParameters = EstimationParameters()
|
|
) -> Optional[TimingFrame]:
|
|
"""Initial guessing method implementation
|
|
|
|
Args:
|
|
goal: Shapely Point to guess towards to
|
|
parameters: dictionary of addition parameters
|
|
|
|
Returns:
|
|
initial TimingFrame guesses, None if no TimingFrame found
|
|
"""
|
|
|
|
distance_to_goal = goal.distance(self.position)
|
|
|
|
nodes = [
|
|
self._RRTNode(
|
|
position=self.position,
|
|
time=0,
|
|
distance=distance_to_goal,
|
|
angle=None,
|
|
points=[],
|
|
node_identifier=0,
|
|
)
|
|
]
|
|
segments = []
|
|
|
|
for _ in range(parameters.max_count):
|
|
|
|
new_point = self._sample_new_node(goal, p_goal=parameters.p_goal)
|
|
new_point, node = self._select_expansion_node(
|
|
new_point, nodes, max_len=distance_to_goal * parameters.max_len_relative
|
|
)
|
|
|
|
new_frame = TimingFrame(
|
|
CartesianRoute.from_numpy(
|
|
np.concatenate(
|
|
[utils.shapely_point_to_ndarray(self.position)[None, :]]
|
|
+ [utils.shapely_point_to_ndarray(p)[None, :] for p in node.points]
|
|
+ [utils.shapely_point_to_ndarray(new_point)[None, :]]
|
|
)
|
|
)
|
|
)
|
|
|
|
new_frame.update_times(self.boat_polar)
|
|
|
|
new_frame_evaluated, _ = self.cost_model.evaluate(new_frame)
|
|
|
|
if new_frame_evaluated.valid:
|
|
segments.append(new_frame_evaluated)
|
|
new_distance = goal.distance(new_point)
|
|
if new_distance < 0.1:
|
|
return TimingFrame(
|
|
CartesianRoute.from_numpy(
|
|
np.concatenate(
|
|
[utils.shapely_point_to_ndarray(self.position)[None, :]]
|
|
+ [utils.shapely_point_to_ndarray(p)[None, :] for p in node.points]
|
|
+ [utils.shapely_point_to_ndarray(new_point)[None, :]]
|
|
)
|
|
)
|
|
)
|
|
|
|
new_node = self._RRTNode(
|
|
position=new_point,
|
|
time=new_frame_evaluated.times[-1],
|
|
distance=new_distance,
|
|
angle=None,
|
|
points=node.points + [new_point],
|
|
node_identifier=0,
|
|
)
|
|
nodes.append(new_node)
|
|
|
|
# print("Counter exceeded")
|
|
return None
|
|
|
|
def _sample_new_node(self, goal: Point, p_dir_goal=0.5, p_goal=0.2, std=None) -> Point:
|
|
"""sampling of a new :class:`_RRTNode`
|
|
|
|
Args:
|
|
goal: shapely point to develop towards
|
|
p_dir_goal: probability to choose the goal direction as next direction
|
|
p_goal: probability to choose the goal as next node
|
|
std: minimum derivation from current direction
|
|
"""
|
|
if np.random.rand() < p_goal: # sample the goal position
|
|
return goal
|
|
if np.random.rand() < p_dir_goal:
|
|
mean = goal
|
|
else:
|
|
mean = self.position
|
|
if std is None:
|
|
dist = goal.distance(self.position)
|
|
std = dist / 1.5
|
|
return Point(mean.x + std * np.random.randn(), mean.y + std * np.random.randn())
|
|
|
|
def _select_expansion_node(
|
|
self, new_point: Point, nodes: List[_RRTNode], max_len: float
|
|
) -> Tuple[Point, _RRTNode]:
|
|
"""Selects a new node to expand towards. Takes the :attr:`wind_angle` into account.
|
|
|
|
Args:
|
|
new_point: shapely point describing relative movements towards the goal taken in the next step
|
|
nodes: list of :class:`_RTTNode`\\s which already have been selected in the past
|
|
max_len: The maximum absolute length to walk towards the goal
|
|
|
|
Returns:
|
|
A new point to expand to and the corresponding node. This point now contains the absolute
|
|
positional information of the next point.
|
|
"""
|
|
|
|
def distance_metric(nodes_to_compare: List[Optimizer._RRTNode], point: Point) -> List[float]:
|
|
"""Just a little vectorized version of point.distance().
|
|
|
|
One could have solved that with .vectorize() but this is more declarative.
|
|
|
|
Args:
|
|
nodes_to_compare: nodes to apply metric onto
|
|
point: points to measure towards
|
|
|
|
Returns:
|
|
distance by metric
|
|
"""
|
|
return [point.distance(n.position) for n in nodes_to_compare]
|
|
|
|
# calculate distances from each possible node to current node and determine therefore
|
|
# their probabilities to progress to them
|
|
|
|
distances = distance_metric(nodes, new_point)
|
|
|
|
probabilities = np.nan_to_num(softmax(-10 / np.amin(distances) * np.array(distances)))
|
|
|
|
exp_node: Optimizer._RRTNode = np.random.choice(nodes, p=probabilities) # type: ignore
|
|
|
|
delta_new_exp = utils.shapely_point_to_ndarray(new_point) - utils.shapely_point_to_ndarray(
|
|
exp_node.position
|
|
)
|
|
distance_to_new, angle_toward_new = (
|
|
linalg.norm(delta_new_exp),
|
|
np.arctan2(delta_new_exp[1], delta_new_exp[0]),
|
|
)
|
|
|
|
wind_angle = self._wind_angle # save it for thread safety
|
|
if wind_angle is not None:
|
|
d_angle = (angle_toward_new - wind_angle + np.pi) % (2 * np.pi) - np.pi
|
|
|
|
if d_angle > np.pi * 5 / 6.0:
|
|
angle_toward_new = wind_angle + np.pi * 3 / 4.0
|
|
elif d_angle < -np.pi * 5 / 6.0:
|
|
angle_toward_new = wind_angle - np.pi * 3 / 4.0
|
|
|
|
distance_to_new = min(distance_to_new, max_len)
|
|
|
|
projected_array = np.array(
|
|
[distance_to_new * np.cos(angle_toward_new), distance_to_new * np.sin(angle_toward_new)]
|
|
)
|
|
|
|
new_point_clipped = Point(
|
|
projected_array[0] + exp_node.position.x, projected_array[1] + exp_node.position.y
|
|
)
|
|
return new_point_clipped, exp_node
|
|
|
|
# --- ROUTE CONSTRUCTION --- #
|
|
|
|
def _get_intersecting_obstacle_ids(
|
|
self, route: shapely.geometry.LineString
|
|
) -> List[Tuple[int, Obstacle]]:
|
|
to_return = []
|
|
for i, ob in enumerate(self.cost_model.obstacles.values()):
|
|
if ob.shape.intersects(route):
|
|
to_return.append((i, ob))
|
|
return to_return
|
|
|
|
def _tangent_route_on_obstacle(
|
|
self, route_seg: shapely.geometry.LineString, obstacle_index: int, scale: float = 1
|
|
) -> List[CartesianRoute]:
|
|
|
|
assert scale > 0
|
|
assert scale < 2
|
|
|
|
obstacle = list(self.cost_model.obstacles.values())[obstacle_index]
|
|
distances = self.cost_model.distance_matrix[obstacle_index]
|
|
center = obstacle.shape.centroid
|
|
|
|
min_distance = np.min(distances[np.nonzero(distances)], initial=np.inf)
|
|
if min_distance == np.inf:
|
|
min_distance = 0
|
|
max_distance_from_center = max([ver.distance(center) for ver in obstacle.shape.locations])
|
|
|
|
scale = scale * (min_distance / max_distance_from_center + 1)
|
|
intersection = cast(shapely.geometry.LineString, obstacle.shape.intersection(route_seg))
|
|
boundary_paths: List[CartesianRoute] = []
|
|
for poly in shapely.ops.split(obstacle.shape, intersection):
|
|
poly = cast(shapely.geometry.Polygon, poly)
|
|
_ = poly.exterior.difference(intersection)
|
|
if isinstance(_, shapely.geometry.multilinestring.MultiLineString):
|
|
_ = shapely.ops.linemerge(_)
|
|
_ = cast(shapely.geometry.linestring.LineString, _)
|
|
# noinspection PyUnresolvedReferences
|
|
_ = shapely.affinity.scale(_, xfact=scale, yfact=scale, zfact=scale, origin=center)
|
|
line_coords = list(_.coords)
|
|
|
|
line_array = np.array(line_coords)
|
|
|
|
boundary_paths.append(CartesianRoute.from_numpy(line_array))
|
|
boundary_paths.append(CartesianRoute.from_numpy(line_array[::-1]))
|
|
|
|
return boundary_paths
|
|
|
|
def _construct_init(
|
|
self, goal: Point, parameters: EstimationParameters = EstimationParameters()
|
|
) -> Optional[List[TimingFrame]]:
|
|
"""
|
|
|
|
Args:
|
|
goal:
|
|
parameters:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
subgoals = np.concatenate(
|
|
(
|
|
utils.shapely_point_to_ndarray(self.position)[None, :],
|
|
utils.shapely_point_to_ndarray(goal)[None, :],
|
|
),
|
|
axis=0,
|
|
)
|
|
|
|
inital_frame: TimingFrame = TimingFrame(CartesianRoute.from_numpy(subgoals))
|
|
inital_frame.update_times(self.boat_polar)
|
|
inital_frame_evaluated: EvaluatedTimingFrame = self.cost_model.evaluate(inital_frame)[0]
|
|
|
|
collision_times = inital_frame_evaluated.collision_times
|
|
|
|
_ = list(collision_times.values())
|
|
collisions_in_order = np.array(list(np.argsort(np.array(_))))
|
|
collisions_in_order = collisions_in_order[: np.sum(np.isfinite(np.array(_)))]
|
|
|
|
frame_parts = {
|
|
i: self._tangent_route_on_obstacle(inital_frame.route, i, scale=1)
|
|
for i, intersecting_obstacle in self._get_intersecting_obstacle_ids(inital_frame.route)
|
|
}
|
|
|
|
if len(frame_parts) == 0:
|
|
route_vec = utils.shapely_point_to_ndarray(goal) - utils.shapely_point_to_ndarray(self.position)
|
|
route_vect_r = route_vec + np.array([-0.01, 0])
|
|
route_vect_l = route_vec + np.array([0.01, 0])
|
|
simple_route = np.concatenate(
|
|
(
|
|
utils.shapely_point_to_ndarray(self.position)[None, :],
|
|
utils.shapely_point_to_ndarray(self.position)[None, :] + 0.2 * route_vect_l[None, :],
|
|
utils.shapely_point_to_ndarray(self.position)[None, :] + 0.4 * route_vect_r[None, :],
|
|
utils.shapely_point_to_ndarray(self.position)[None, :] + 0.6 * route_vect_l[None, :],
|
|
utils.shapely_point_to_ndarray(self.position)[None, :] + 0.8 * route_vect_r[None, :],
|
|
utils.shapely_point_to_ndarray(goal)[None, :],
|
|
),
|
|
axis=0,
|
|
)
|
|
|
|
simple_frame = TimingFrame(CartesianRoute.from_numpy(simple_route))
|
|
|
|
return [simple_frame]
|
|
|
|
# frame_parts[0] exists
|
|
|
|
frames_on_bench = []
|
|
for obs_already_finished, obs_ind in enumerate(collisions_in_order):
|
|
if obs_already_finished == 0:
|
|
for first_part in frame_parts[obs_ind]:
|
|
frames_on_bench.append(TimingFrame(first_part))
|
|
else:
|
|
temp = []
|
|
for obstacle_hull in frame_parts[obs_ind]:
|
|
for old_frame in frames_on_bench:
|
|
temp.append(old_frame.append(obstacle_hull))
|
|
frames_on_bench = temp
|
|
|
|
constructed_route_arrays = [
|
|
np.concatenate(
|
|
(
|
|
utils.shapely_point_to_ndarray(self.position)[None, :],
|
|
f.route.to_numpy(),
|
|
utils.shapely_point_to_ndarray(goal)[None, :],
|
|
),
|
|
axis=0,
|
|
)
|
|
for f in frames_on_bench
|
|
]
|
|
constructed_route: List[TimingFrame] = [
|
|
TimingFrame(CartesianRoute.from_numpy(array)) for array in constructed_route_arrays
|
|
]
|
|
return constructed_route
|