Skip to content

Planning Module

abstract_planner ¤

AbstractPlanner ¤

__init__(self, agent, **kwargs) special ¤

Source code in ROAR/planning_module/abstract_planner.py
def __init__(self, agent, **kwargs):
    super().__init__(**kwargs)
    self.logger = logging
    self.logger = logging.getLogger(__name__)
    self.agent = agent

run_in_series(self, **kwargs) ¤

On every step, produce an actionable plan

Returns:

Type Description
Any
Source code in ROAR/planning_module/abstract_planner.py
@abstractmethod
def run_in_series(self, **kwargs) -> Any:
    """
    On every step, produce an actionable plan
    Returns:
    """
    return None

run_in_threaded(self, **kwargs) ¤

This is the threaded function.

Parameters:

Name Type Description Default
**kwargs {}
Source code in ROAR/planning_module/abstract_planner.py
def run_in_threaded(self, **kwargs):
    pass

save(self, **kwargs) ¤

Source code in ROAR/planning_module/abstract_planner.py
def save(self, **kwargs):
    pass

behavior_planner special ¤

behavior_planner ¤

BehaviorPlanner ¤

__init__(self, agent, **kwargs) special ¤
Source code in ROAR/planning_module/behavior_planner/behavior_planner.py
def __init__(self, agent, **kwargs):
    super().__init__(agent, **kwargs)
run_in_series(self) ¤

On every step, produce an actionable plan

Returns:

Type Description
Any
Source code in ROAR/planning_module/behavior_planner/behavior_planner.py
def run_in_series(self) -> Any:
    pass

local_planner special ¤

floodfill_based_planner ¤

FloodfillBasedPlanner ¤

run_in_series(self) ¤

On every step, produce an actionable plan

Returns:

Type Description
VehicleControl
Source code in ROAR/planning_module/local_planner/floodfill_based_planner.py
def run_in_series(self) -> VehicleControl:
    pass

local_planner ¤

LocalPlanner ¤

__init__(self, agent, controller=None, behavior_planner=None, mission_planner=None, **kwargs) special ¤
Source code in ROAR/planning_module/local_planner/local_planner.py
def __init__(
        self,
        agent,
        controller: Optional[Controller] = None,
        behavior_planner: Optional[BehaviorPlanner] = None,
        mission_planner: Optional[MissionPlanner] = None,
        **kwargs
):
    super().__init__(agent=agent, **kwargs)
    self.controller = (
        Controller(agent=agent) if controller is None else controller
    )
    self.behavior_planner = (
        BehaviorPlanner(agent=agent)
        if behavior_planner is None
        else behavior_planner
    )
    self.mission_planner = (
        MissionPlanner(agent=agent)
        if mission_planner is None
        else mission_planner
    )
    self.way_points_queue = deque()
is_done(self) ¤
Source code in ROAR/planning_module/local_planner/local_planner.py
@abstractmethod
def is_done(self):
    return False
run_in_series(self) ¤

On every step, produce an actionable plan

Returns:

Type Description
VehicleControl
Source code in ROAR/planning_module/local_planner/local_planner.py
@abstractmethod
def run_in_series(self) -> VehicleControl:
    return VehicleControl()

rl_local_planner ¤

RLLocalPlanner ¤

__init__(self, agent, controller) special ¤

Initialize Simple Waypoint Following Planner

Parameters:

Name Type Description Default
agent Agent

newest agent state

required
controller Controller

Control module used

required
Source code in ROAR/planning_module/local_planner/rl_local_planner.py
def __init__(
        self,
        agent: Agent,
        controller: Controller,
):
    """
    Initialize Simple Waypoint Following Planner
    Args:
        agent: newest agent state
        controller: Control module used
    """
    super().__init__(agent=agent, controller=controller)
    self.logger = logging.getLogger("SimplePathFollowingLocalPlanner")
    self.logger.debug("Simple Path Following Local Planner Initiated")
    self.way_points_queue = deque(maxlen=10)
is_done(self) ¤

If there are nothing in self.way_points_queue, that means you have finished a lap, you are done

Returns:

Type Description
bool

True if Done, False otherwise

Source code in ROAR/planning_module/local_planner/rl_local_planner.py
def is_done(self) -> bool:
    """
    If there are nothing in self.way_points_queue,
    that means you have finished a lap, you are done

    Returns:
        True if Done, False otherwise
    """
    return False
run_in_series(self) ¤

On every step, produce an actionable plan

Returns:

Type Description
VehicleControl
Source code in ROAR/planning_module/local_planner/rl_local_planner.py
def run_in_series(self) -> VehicleControl:
    next_waypoint: Transform = self.agent.kwargs.get("next_waypoint", self.agent.vehicle.transform)
    self.way_points_queue.append(next_waypoint)
    control = self.controller.run_in_series(next_waypoint=next_waypoint)
    return control

simple_waypoint_following_local_planner ¤

SimpleWaypointFollowingLocalPlanner ¤

__init__(self, agent, controller, mission_planner, behavior_planner, closeness_threshold=0.5) special ¤

Initialize Simple Waypoint Following Planner

Parameters:

Name Type Description Default
agent Agent

newest agent state

required
controller Controller

Control module used

required
mission_planner MissionPlanner

mission planner used

required
behavior_planner BehaviorPlanner

behavior planner used

required
closeness_threshold

how close can a waypoint be with the vehicle

0.5
Source code in ROAR/planning_module/local_planner/simple_waypoint_following_local_planner.py
def __init__(
        self,
        agent: Agent,
        controller: Controller,
        mission_planner: MissionPlanner,
        behavior_planner: BehaviorPlanner,
        closeness_threshold=0.5,
):
    """
    Initialize Simple Waypoint Following Planner
    Args:
        agent: newest agent state
        controller: Control module used
        mission_planner: mission planner used
        behavior_planner: behavior planner used
        closeness_threshold: how close can a waypoint be with the vehicle
    """
    super().__init__(agent=agent,
                     controller=controller,
                     mission_planner=mission_planner,
                     behavior_planner=behavior_planner,
                     )
    self.logger = logging.getLogger("SimplePathFollowingLocalPlanner")
    self.set_mission_plan()
    self.logger.debug("Simple Path Following Local Planner Initiated")
    self.closeness_threshold = closeness_threshold
    self.closeness_threshold_config = json.load(Path(
        agent.agent_settings.simple_waypoint_local_planner_config_file_path).open(mode='r'))
is_done(self) ¤

If there are nothing in self.way_points_queue, that means you have finished a lap, you are done

Returns:

Type Description
bool

True if Done, False otherwise

Source code in ROAR/planning_module/local_planner/simple_waypoint_following_local_planner.py
def is_done(self) -> bool:
    """
    If there are nothing in self.way_points_queue,
    that means you have finished a lap, you are done

    Returns:
        True if Done, False otherwise
    """
    return len(self.way_points_queue) == 0
run_in_series(self) ¤

Procedure

  1. Sync data
  2. get the correct look ahead for current speed
  3. get the correct next waypoint
  4. feed waypoint into controller
  5. return result from controller

Returns:

Type Description
VehicleControl

next control that the local think the agent should execute.

Source code in ROAR/planning_module/local_planner/simple_waypoint_following_local_planner.py
def run_in_series(self) -> VehicleControl:
    """
    Run step for the local planner
    Procedure:
        1. Sync data
        2. get the correct look ahead for current speed
        3. get the correct next waypoint
        4. feed waypoint into controller
        5. return result from controller

    Returns:
        next control that the local think the agent should execute.
    """
    if (
            len(self.mission_planner.mission_plan) == 0
            and len(self.way_points_queue) == 0
    ):
        return VehicleControl()

    # get vehicle's location
    vehicle_transform: Union[Transform, None] = self.agent.vehicle.transform

    if vehicle_transform is None:
        raise AgentException("I do not know where I am, I cannot proceed forward")

    # redefine closeness level based on speed
    self.set_closeness_threhold(self.closeness_threshold_config)

    # get current waypoint
    curr_closest_dist = float("inf")
    while True:
        if len(self.way_points_queue) == 0:
            self.logger.info("Destination reached")
            return VehicleControl()
        waypoint: Transform = self.way_points_queue[0]
        curr_dist = vehicle_transform.location.distance(waypoint.location)
        if curr_dist < curr_closest_dist:
            # if i find a waypoint that is closer to me than before
            # note that i will always enter here to start the calculation for curr_closest_dist
            curr_closest_dist = curr_dist
        elif curr_dist < self.closeness_threshold:
            # i have moved onto a waypoint, remove that waypoint from the queue
            self.way_points_queue.popleft()
        else:
            break

    target_waypoint = self.way_points_queue[0]
    control: VehicleControl = self.controller.run_in_series(next_waypoint=target_waypoint)
    # self.logger.debug(f"\n"
    #                   f"Curr Transform: {self.agent.vehicle.transform}\n"
    #                   f"Target Location: {target_waypoint.location}\n"
    #                   f"Control: {control} | Speed: {Vehicle.get_speed(self.agent.vehicle)}\n")
    return control
set_closeness_threhold(self, config) ¤
Source code in ROAR/planning_module/local_planner/simple_waypoint_following_local_planner.py
def set_closeness_threhold(self, config: dict):
    curr_speed = Vehicle.get_speed(self.agent.vehicle)
    for speed_upper_bound, closeness_threshold in config.items():
        speed_upper_bound = float(speed_upper_bound)
        if curr_speed < speed_upper_bound:
            self.closeness_threshold = closeness_threshold
            break
set_mission_plan(self) ¤

Clears current waypoints, and reset mission plan from start I am simply transferring the mission plan into my waypoint queue. Assuming that this current run will run all the way to the end

Returns:

Type Description
None

None

Source code in ROAR/planning_module/local_planner/simple_waypoint_following_local_planner.py
def set_mission_plan(self) -> None:
    """
    Clears current waypoints, and reset mission plan from start
    I am simply transferring the mission plan into my waypoint queue.
    Assuming that this current run will run all the way to the end

    Returns:
        None
    """
    self.way_points_queue.clear()
    while (
            self.mission_planner.mission_plan
    ):  # this actually clears the mission plan!!
        self.way_points_queue.append(self.mission_planner.mission_plan.popleft())

smooth_waypoint_following_local_planner ¤

SmoothWaypointFollowingLocalPlanner ¤

Waypoint following local planner with waypoint lookahead for smoothing and target speed reduction.

next_waypoint_smooth_and_speed(self, smooth_lookahead=400, speed_lookahead=600, min_speed_multiplier=0.6, speed_multiplier_slope=1.3) ¤

Calculate the next target waypoint and speed for the controller.

Parameters smooth_lookahead : int Number of waypoints ahead to look at to compute the smoothed waypoint. speed_lookahead : int Number of waypoint to look ahaed to compute speed factor. min_speed_multiplier : float The minimum value for the speed multiplier. speed_multiplier_slope : float The rate of speed multiplier decrease for every 180 degrees of angle error.

Returns target_waypoint : Transform The next target waypoint for the controller speed_multiplier : float The speed multiplier for the controller's target speed.

Source code in ROAR/planning_module/local_planner/smooth_waypoint_following_local_planner.py
def next_waypoint_smooth_and_speed(self, smooth_lookahead=400, speed_lookahead=600,
                                   min_speed_multiplier=0.6, speed_multiplier_slope=1.3) -> (Transform, float):
    """
    Calculate the next target waypoint and speed for the controller.

    Parameters
    smooth_lookahead : int
        Number of waypoints ahead to look at to compute the smoothed waypoint.
    speed_lookahead : int
        Number of waypoint to look ahaed to compute speed factor.
    min_speed_multiplier : float
        The minimum value for the speed multiplier.
    speed_multiplier_slope : float
        The rate of speed multiplier decrease for every 180 degrees of angle error.

    Returns
    target_waypoint : Transform
        The next target waypoint for the controller
    speed_multiplier : float
        The speed multiplier for the controller's target speed.
    """

    smooth_lookahead = min(smooth_lookahead, len(self.way_points_queue) - 1)
    speed_lookahead = min(speed_lookahead, len(self.way_points_queue) - 1)

    if smooth_lookahead > 10:  # Reduce computation by only looking at every 10 steps ahead
        sample_points = range(0, smooth_lookahead, smooth_lookahead // 10)
        location_sum = reduce(lambda x, y: x + y,
                              (self.way_points_queue[i].location for i in sample_points))
        rotation_sum = reduce(lambda x, y: x + y,
                              (self.way_points_queue[i].rotation for i in sample_points))

        num_points = len(sample_points)
        target_waypoint = Transform(location=location_sum / num_points, rotation=rotation_sum / num_points)
    else:
        target_waypoint = self.way_points_queue[-1]

    if speed_lookahead > 0:
        angle_difference = self._calculate_angle_error(self.way_points_queue[speed_lookahead])
        # Angle difference is between 0 and 180, but unlikely to be more than 90
        speed_multiplier = max(min_speed_multiplier,
                               (1.0 - speed_multiplier_slope * angle_difference / np.pi))
    else:
        speed_multiplier = 1.0

    return target_waypoint, speed_multiplier
run_in_series(self) ¤

Procedure

  1. Sync data
  2. get the correct look ahead for current speed
  3. get the correct next waypoint
  4. feed waypoint into controller
  5. return result from controller

Returns:

Type Description
VehicleControl

next control that the local think the agent should execute.

Source code in ROAR/planning_module/local_planner/smooth_waypoint_following_local_planner.py
def run_in_series(self) -> VehicleControl:
    """
    Run step for the local planner
    Procedure:
        1. Sync data
        2. get the correct look ahead for current speed
        3. get the correct next waypoint
        4. feed waypoint into controller
        5. return result from controller

    Returns:
        next control that the local think the agent should execute.
    """
    if (
            len(self.mission_planner.mission_plan) == 0
            and len(self.way_points_queue) == 0
    ):
        return VehicleControl()

    # get vehicle's location
    vehicle_transform: Union[Transform, None] = self.agent.vehicle.transform

    if vehicle_transform is None:
        raise AgentException("I do not know where I am, I cannot proceed forward")

    # redefine closeness level based on speed
    self.set_closeness_threhold(self.closeness_threshold_config)

    # get current waypoint
    curr_closest_dist = float("inf")
    while True:
        if len(self.way_points_queue) == 0:
            self.logger.info("Destination reached")
            return VehicleControl()
        # waypoint: Transform = self.way_points_queue[0]
        waypoint, speed_factor = self.next_waypoint_smooth_and_speed()
        curr_dist = vehicle_transform.location.distance(waypoint.location)
        if curr_dist < curr_closest_dist:
            # if i find a waypoint that is closer to me than before
            # note that i will always enter here to start the calculation for curr_closest_dist
            curr_closest_dist = curr_dist
        elif curr_dist < self.closeness_threshold:
            # i have moved onto a waypoint, remove that waypoint from the queue
            self.way_points_queue.popleft()
        else:
            break

    target_waypoint, speed_factor = self.next_waypoint_smooth_and_speed()
    control: VehicleControl = self.controller.run_in_series(next_waypoint=target_waypoint,
                                                            speed_multiplier=speed_factor)
    self.logger.debug(f"\n"
                      f"Curr Transform: {self.agent.vehicle.transform}\n"
                      f"Target Location: {target_waypoint.location}\n"
                      f"Control: {control} | Speed: {Vehicle.get_speed(self.agent.vehicle)}\n")
    return control

mission_planner special ¤

json_waypoint_planner ¤

JSONWaypointPlanner ¤

__init__(self, agent) special ¤
Source code in ROAR/planning_module/mission_planner/json_waypoint_planner.py
def __init__(self, agent: Agent):
    super().__init__(agent=agent)
    self.file_path: Path = Path(self.agent.agent_config.json_waypoint_file_path)
    self.mission_plan: deque = self.run_in_series()
run_in_series(self) ¤

Abstract run step function for Mission Planner

Parameters:

Name Type Description Default
vehicle

new vehicle state

required

Returns:

Type Description
deque

Plan for next steps

Source code in ROAR/planning_module/mission_planner/json_waypoint_planner.py
def run_in_series(self) -> deque:
    result = deque()
    map_entries = self._read_data_file()
    for m in map_entries:
        result.append(self._map_entry_to_transform(map_entry=m))
    return result

mission_planner ¤

MissionPlanner ¤

__init__(self, agent, **kwargs) special ¤
Source code in ROAR/planning_module/mission_planner/mission_planner.py
def __init__(self, agent, **kwargs):
    super().__init__(agent=agent,**kwargs)
    self.logger = logging.getLogger(__name__)
    self.mission_plan: deque = deque()
run_in_series(self) ¤

Abstract run step function for Mission Planner

Parameters:

Name Type Description Default
vehicle

new vehicle state

required

Returns:

Type Description
List[ROAR.utilities_module.data_structures_models.Transform]

Plan for next steps

Source code in ROAR/planning_module/mission_planner/mission_planner.py
def run_in_series(self) -> List[Transform]:
    """
    Abstract run step function for Mission Planner

    Args:
        vehicle: new vehicle state

    Returns:
        Plan for next steps

    """
    return []

waypoint_following_mission_planner ¤

WaypointFollowingMissionPlanner ¤

A mission planner that takes in a file that contains x,y,z coordinates, formulate into carla.Transform

__init__(self, agent) special ¤
Source code in ROAR/planning_module/mission_planner/waypoint_following_mission_planner.py
def __init__(self, agent: Agent):
    super().__init__(agent=agent)
    self.logger = logging.getLogger(__name__)
    self.file_path: Path = Path(self.agent.agent_settings.waypoint_file_path)
    self.mission_plan = self.produce_mission_plan()
    self.logger.debug("Path Following Mission Planner Initiated.")
produce_mission_plan(self) ¤

Generates a list of waypoints based on the input file path :return a list of waypoint

Source code in ROAR/planning_module/mission_planner/waypoint_following_mission_planner.py
def produce_mission_plan(self) -> deque:
    """
    Generates a list of waypoints based on the input file path
    :return a list of waypoint
    """
    raw_path: List[List[float]] = self._read_data_file()
    length = self.agent.agent_settings.num_laps * len(raw_path)
    mission_plan = deque(maxlen=length)
    for coord in np.tile(raw_path, (self.agent.agent_settings.num_laps, 1)):
        if len(coord) == 3 or len(coord) == 6:
            mission_plan.append(self._raw_coord_to_transform(coord))
    self.logger.debug(f"Computed Mission path of length [{len(mission_plan)}]")
    return mission_plan
produce_single_lap_mission_plan(self) ¤
Source code in ROAR/planning_module/mission_planner/waypoint_following_mission_planner.py
def produce_single_lap_mission_plan(self):
    raw_path: List[List[float]] = self._read_data_file()
    mission_plan = deque(maxlen=len(raw_path))
    for coord in raw_path:
        if len(coord) == 3 or len(coord) == 6:
            mission_plan.append(self._raw_coord_to_transform(coord))
    self.logger.debug(f"Computed Mission path of length [{len(mission_plan)}]")
    return mission_plan
run_in_series(self) ¤

Regenerate waypoints from file Find the waypoint that is closest to the current vehicle location. return a mission plan starting from that waypoint

Parameters:

Name Type Description Default
vehicle

current state of the vehicle

required

Returns:

Type Description
deque

mission plan that start from the current vehicle location

Source code in ROAR/planning_module/mission_planner/waypoint_following_mission_planner.py
def run_in_series(self) -> deque:
    """
    Regenerate waypoints from file
    Find the waypoint that is closest to the current vehicle location.
    return a mission plan starting from that waypoint

    Args:
        vehicle: current state of the vehicle

    Returns:
        mission plan that start from the current vehicle location
    """
    super(WaypointFollowingMissionPlanner, self).run_in_series()
    return self.produce_mission_plan()