Skip to content

Control Module

Controller ¤

__init__(self, agent, **kwargs) special ¤

Source code in ROAR/control_module/controller.py
def __init__(self, agent, **kwargs):
    super().__init__(**kwargs)
    self.agent = agent
    self.logger = logging.getLogger("Controller")

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
VehicleControl

VehicleControl

Source code in ROAR/control_module/controller.py
@abstractmethod
def run_in_series(self, next_waypoint: Transform, **kwargs) \
        -> VehicleControl:
    """
    Abstract function for run step

    Args:
        next_waypoint: next waypoint
        **kwargs:

    Returns:
        VehicleControl
    """
    return VehicleControl()

run_in_threaded(self, **kwargs) ¤

This is the threaded function.

Parameters:

Name Type Description Default
**kwargs {}
Source code in ROAR/control_module/controller.py
def run_in_threaded(self, **kwargs):
    pass

save(self, **kwargs) ¤

Source code in ROAR/control_module/controller.py
def save(self, **kwargs):
    pass

LQRController ¤

__init__(self, agent, steering_boundary, throttle_boundary, **kwargs) special ¤

Source code in ROAR/control_module/lqr_controller.py
def __init__(self, agent, steering_boundary: Tuple[float, float],
             throttle_boundary: Tuple[float, float], **kwargs):
    super().__init__(agent, **kwargs)
    self.max_speed = self.agent.agent_settings.max_speed
    self.throttle_boundary = throttle_boundary
    self.steering_boundary = steering_boundary

    # load in system matrices
    self.config = json.load(Path(agent.agent_settings.lqr_config_file_path).open(mode='r'))
    self.A = np.array(self.config['A'])
    self.B = np.array(self.config['B'])
    self.Q = np.array(self.config['Q'])
    self.R = np.array(self.config['R'])
    # calculate our feedback matrix
    self.P, self.K = self._dlqr(self.A,
                                self.B,
                                self.Q,
                                self.R)

    # some reactive speed control stuff
    self.errBoi = 0
    self.errAlpha = self.config['errAlpha']
    self.slowdown = self.config['slowdown']
    self.maxSlow = self.config['maxSlow']

    self.logger = logging.getLogger(__name__)

run_in_series(self, next_waypoint, speed_multiplier=1.0, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
VehicleControl

VehicleControl

Source code in ROAR/control_module/lqr_controller.py
def run_in_series(self, next_waypoint: Transform, speed_multiplier=1.0, **kwargs) -> VehicleControl:
    # Calculate the current angle to the next waypoint
    angBoi = -self._calculate_angle_error(next_waypoint=next_waypoint)
    # Grab our current speed
    curSpeed = Vehicle.get_speed(self.agent.vehicle)
    # Toss both values into a current xt
    xt = np.array([angBoi, curSpeed])

    # Generate our target speed with reactive speed reduction when off track
    target_speed = min(self.max_speed, kwargs.get("target_speed", self.max_speed)) * speed_multiplier
    # if we are very off track, update error to reflect that
    absErr = np.abs(angBoi)
    if absErr > self.errBoi:
        self.errBoi = absErr
    else: # if we are getting back on track, gradually reduce our error 
        self.errBoi = self.errBoi*(1-self.errAlpha) + absErr*self.errAlpha
    # reduce our target speed based on how far off target we are
    # target_speed *= (math.exp(-self.errBoi) - 1) * self.slowdown + 1
    target_speed *= max((math.cos(self.errBoi) - 1) * self.slowdown, -self.maxSlow) + 1

    ## Note for future: It may be helpful to have another module for adaptive speed control and some way to slowly
    ## increase the target speed when we can.

    # Assume we want to go in the direction of the waypoint at the target speed foreversies
    xd = np.array([0, target_speed])
    cur_speed = Vehicle.get_speed(self.agent.vehicle)
    cd = np.array([0, cur_speed])
    # Calculate the feasible ud trajectory
    ud,_,_,_ = np.linalg.lstsq(self.B, xd-np.dot(self.A, cd), rcond=None)

    # convert to offset variables zt and ht
    zt = xt - xd
    ht = -np.dot(self.K, zt)
    # convert back to ut and clip our inputs
    ut = ht + ud
    steering = np.clip(ut[0], self.steering_boundary[0], self.steering_boundary[1])
    throttle = np.clip(ut[1], self.throttle_boundary[0], self.throttle_boundary[1])

    return VehicleControl(steering=steering, throttle=throttle)

VehicleMPCController ¤

__init__(self, agent, route_file_path, target_speed=inf, steps_ahead=10, max_throttle=1, max_steering=1, dt=0.1) special ¤

Source code in ROAR/control_module/mpc_controller.py
def __init__(self,
             agent: Agent,
             route_file_path: Path,  # read in route
             target_speed=float("inf"),
             steps_ahead=10,
             max_throttle=1,
             max_steering=1,
             dt=0.1):
    super().__init__(agent=agent)
    self.logger = logging.getLogger(__name__)
    # Read in route file
    self.track_DF = pd.read_csv(route_file_path, header=None)
    # Fit the route to a curve
    spline_points = 10000
    self.pts_2D = self.track_DF.loc[:, [0, 1]].values
    tck, u = splprep(self.pts_2D.T, u=None, s=2.0, per=1, k=3)
    u_new = np.linspace(u.min(), u.max(), spline_points)
    x_new, y_new = splev(u_new, tck, der=0)
    self.pts_2D = np.c_[x_new, y_new]

    # Modified parm
    self.prev_cte = 0

    self.target_speed = target_speed
    self.state_vars = ('x', 'y', 'v', 'ψ', 'cte', 'eψ')

    self.steps_ahead = steps_ahead
    self.dt = dt

    # Cost function coefficients
    self.cte_coeff = 100  # 100
    self.epsi_coeff = 100  # 100
    self.speed_coeff = 0.4  # 0.2
    self.acc_coeff = 1  # 1
    self.steer_coeff = 0.1  # 0.1
    self.consec_acc_coeff = 50
    self.consec_steer_coeff = 50

    # Front wheel L
    self.Lf = 2.5

    # How the polynomial fitting the desired curve is fitted
    self.steps_poly = 30  # modify to 3 when using 3D data
    self.poly_degree = 3

    # Bounds for the optimizer
    self.bounds = (
            6 * self.steps_ahead * [(None, None)]
            + self.steps_ahead * [(0, max_throttle)]  # throttle bounds
            + self.steps_ahead * [(-max_steering, max_steering)]
        # steer bounds
    )

    # State 0 placeholder
    num_vars = (len(
        self.state_vars) + 2)  # State variables and two actuators
    self.state0 = np.zeros(self.steps_ahead * num_vars)

    # Lambdify and minimize stuff
    self.evaluator = 'numpy'
    self.tolerance = 1
    self.cost_func, self.cost_grad_func, self.constr_funcs = \
        self.get_func_constraints_and_bounds()

    # To keep the previous state
    self.steer = 0
    self.throttle = 0

    self.logger.debug("MPC Controller initiated")

clip_throttle(throttle, curr_speed, target_speed) staticmethod ¤

Source code in ROAR/control_module/mpc_controller.py
@staticmethod
def clip_throttle(throttle, curr_speed, target_speed):
    return np.clip(
        throttle - 0.01 * (curr_speed - target_speed),
        0.4,
        0.9
    )

create_array_of_symbols(str_symbol, N) staticmethod ¤

Source code in ROAR/control_module/mpc_controller.py
@staticmethod
def create_array_of_symbols(str_symbol, N):
    return sym.symbols('{symbol}0:{N}'.format(symbol=str_symbol, N=N))

generate_fun(self, symb_fun, vars_, init, poly) ¤

Generates a function of the form fun(x, *args)

Source code in ROAR/control_module/mpc_controller.py
def generate_fun(self, symb_fun, vars_, init, poly):
    """
    Generates a function of the form `fun(x, *args)`
    """
    args = init + poly
    return sym.lambdify((vars_, *args), symb_fun, self.evaluator)

generate_grad(self, symb_fun, vars_, init, poly) ¤

TODO: add comments

Source code in ROAR/control_module/mpc_controller.py
def generate_grad(self, symb_fun, vars_, init, poly):
    """
    TODO: add comments
    """
    args = init + poly
    return sym.lambdify(
        (vars_, *args),
        derive_by_array(symb_fun, vars_ + args)[:len(vars_)],
        self.evaluator
    )

get_closest_waypoint_index_2D(self, car_location, waypoint_location) ¤

Get the index of the closest waypoint in self.pts_2D Note: it may give wrong index when the route is overlapped

Source code in ROAR/control_module/mpc_controller.py
def get_closest_waypoint_index_2D(self, car_location, waypoint_location):
    """Get the index of the closest waypoint in self.pts_2D
        Note: it may give wrong index when the route is overlapped
    """
    location_arr = np.array([
        car_location.x,
        car_location.y
    ])
    dists = np.linalg.norm(self.pts_2D - location_arr, axis=1)
    return np.argmin(dists)

get_closest_waypoint_index_3D(self, car_location, waypoint_location) ¤

Get the index of the closest waypoint in self.track_DF car_location: current car location waypoint_location: next_waypoint

Source code in ROAR/control_module/mpc_controller.py
def get_closest_waypoint_index_3D(self, car_location, waypoint_location):
    """Get the index of the closest waypoint in self.track_DF
        car_location: current car location
        waypoint_location: next_waypoint
    """
    index = self.track_DF.loc[(self.track_DF[0] == waypoint_location.x)
                              & (self.track_DF[
                                     1] == waypoint_location.y)].index
    if len(index) > 0:
        return index[0]
    else:
        location_arr = np.array([
            car_location.x,
            car_location.y,
            car_location.z,
        ])
        dists = np.linalg.norm(self.track_DF - location_arr, axis=1)
        return np.argmin(dists)

get_func_constraints_and_bounds(self) ¤

Defines MPC's cost function and constraints.

Source code in ROAR/control_module/mpc_controller.py
def get_func_constraints_and_bounds(self):
    """
    Defines MPC's cost function and constraints.
    """
    # Polynomial coefficients will also be symbolic variables
    poly = self.create_array_of_symbols('poly', self.poly_degree + 1)

    # Initialize the initial state
    x_init = sym.symbols('x_init')
    y_init = sym.symbols('y_init')
    ψ_init = sym.symbols('ψ_init')
    v_init = sym.symbols('v_init')
    cte_init = sym.symbols('cte_init')
    eψ_init = sym.symbols('eψ_init')

    init = (x_init, y_init, ψ_init, v_init, cte_init, eψ_init)

    # State variables
    x = self.create_array_of_symbols('x', self.steps_ahead)
    y = self.create_array_of_symbols('y', self.steps_ahead)
    ψ = self.create_array_of_symbols('ψ', self.steps_ahead)
    v = self.create_array_of_symbols('v', self.steps_ahead)
    cte = self.create_array_of_symbols('cte', self.steps_ahead)
     = self.create_array_of_symbols('eψ', self.steps_ahead)

    # Actuators
    a = self.create_array_of_symbols('a', self.steps_ahead)
    δ = self.create_array_of_symbols('δ', self.steps_ahead)

    vars_ = (
        # Symbolic arrays (but NOT actuators)
        *x, *y, *ψ, *v, *cte, *,

        # Symbolic arrays (actuators)
        *a, *δ,
    )

    cost = 0
    for t in range(self.steps_ahead):
        cost += (
            # Reference state penalties
                self.cte_coeff * cte[t] ** 2
                + self.epsi_coeff * [t] ** 2 +
                + self.speed_coeff * (v[t] - self.target_speed) ** 2

                # Actuator penalties
                + self.acc_coeff * a[t] ** 2
                + self.steer_coeff * δ[t] ** 2
        )

    # Penalty for differences in consecutive actuators
    for t in range(self.steps_ahead - 1):
        cost += (
                self.consec_acc_coeff * (a[t + 1] - a[t]) ** 2
                + self.consec_steer_coeff * (δ[t + 1] - δ[t]) ** 2
        )

    # Initialize constraints
    eq_constr = _EqualityConstraints(self.steps_ahead, self.state_vars)
    eq_constr['x'][0] = x[0] - x_init
    eq_constr['y'][0] = y[0] - y_init
    eq_constr['ψ'][0] = ψ[0] - ψ_init
    eq_constr['v'][0] = v[0] - v_init
    eq_constr['cte'][0] = cte[0] - cte_init
    eq_constr['eψ'][0] = [0] - eψ_init

    for t in range(1, self.steps_ahead):
        curve = sum(
            poly[-(i + 1)] * x[t - 1] ** i for i in range(len(poly)))
        # The desired ψ is equal to the derivative of the polynomial
        # curve at
        #  point x[t-1]
        ψdes = sum(poly[-(i + 1)] * i * x[t - 1] ** (i - 1) for i in
                   range(1, len(poly)))

        eq_constr['x'][t] = x[t] - (
                x[t - 1] + v[t - 1] * sym.cos(ψ[t - 1]) * self.dt)
        eq_constr['y'][t] = y[t] - (
                y[t - 1] + v[t - 1] * sym.sin(ψ[t - 1]) * self.dt)
        eq_constr['ψ'][t] = ψ[t] - (
                ψ[t - 1] - v[t - 1] * δ[t - 1] / self.Lf * self.dt)
        eq_constr['v'][t] = v[t] - (v[t - 1] + a[t - 1] * self.dt)
        eq_constr['cte'][t] = cte[t] - (
                curve - y[t - 1] + v[t - 1] * sym.sin(
            [t - 1]) * self.dt)
        eq_constr['eψ'][t] = [t] - (ψ[t - 1] - ψdes - v[t - 1] * δ[
            t - 1] / self.Lf * self.dt)

    # Generate actual functions from
    cost_func = self.generate_fun(cost, vars_, init, poly)
    cost_grad_func = self.generate_grad(cost, vars_, init, poly)

    constr_funcs = []
    for symbol in self.state_vars:
        for t in range(self.steps_ahead):
            func = self.generate_fun(eq_constr[symbol][t], vars_, init,
                                     poly)
            grad_func = self.generate_grad(eq_constr[symbol][t], vars_,
                                           init, poly)
            constr_funcs.append(
                {'type': 'eq', 'fun': func, 'jac': grad_func,
                 'args': None},
            )

    return cost_func, cost_grad_func, constr_funcs

get_state0(self, v, cte, epsi, a, delta, poly) ¤

Source code in ROAR/control_module/mpc_controller.py
def get_state0(self, v, cte, epsi, a, delta, poly):
    a = a or 0
    delta = delta or 0

    x = np.linspace(0, 1, self.steps_ahead)
    y = np.polyval(poly, x)
    psi = 0

    self.state0[:self.steps_ahead] = x
    self.state0[self.steps_ahead:2 * self.steps_ahead] = y
    self.state0[2 * self.steps_ahead:3 * self.steps_ahead] = psi
    self.state0[3 * self.steps_ahead:4 * self.steps_ahead] = v
    self.state0[4 * self.steps_ahead:5 * self.steps_ahead] = cte
    self.state0[5 * self.steps_ahead:6 * self.steps_ahead] = epsi
    self.state0[6 * self.steps_ahead:7 * self.steps_ahead] = a
    self.state0[7 * self.steps_ahead:8 * self.steps_ahead] = delta
    return self.state0

minimize_cost(self, bounds, x0, init) ¤

Source code in ROAR/control_module/mpc_controller.py
def minimize_cost(self, bounds, x0, init):
    for constr_func in self.constr_funcs:
        constr_func['args'] = init

    return minimize(
        fun=self.cost_func,
        x0=x0,
        args=init,
        jac=self.cost_grad_func,
        bounds=bounds,
        constraints=self.constr_funcs,
        method='SLSQP',
        tol=self.tolerance,
    )

run_in_series(self, next_waypoint) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs required

Returns:

Type Description
VehicleControl

VehicleControl

Source code in ROAR/control_module/mpc_controller.py
def run_in_series(self, next_waypoint: Transform) -> VehicleControl:
    super(VehicleMPCController, self).run_in_series(next_waypoint)
    # get vehicle location (x, y)
    # location = self.vehicle.transform.location

    location = self.agent.vehicle.transform.location
    x, y = location.x, location.y
    # get vehicle rotation
    # rotation = self.vehicle.transform.rotation
    rotation = self.agent.vehicle.transform.rotation
    ψ = rotation.yaw / 180 * np.pi  # transform into radient
    cos_ψ = np.cos(ψ)
    sin_ψ = np.sin(ψ)
    # get vehicle speed
    # v = Vehicle.get_speed(self.vehicle)
    v = Vehicle.get_speed(self.agent.vehicle)
    # get next waypoint location
    wx, wy = next_waypoint.location.x, next_waypoint.location.y
    # debug logging
    # self.logger.debug(f"car location:  ({x}, {y})")
    # self.logger.debug(f"car ψ: {ψ}")
    # self.logger.debug(f"car speed: {v}")
    # self.logger.debug(f"next waypoint: ({wx}, {wy})")

    ### 3D ###
    # get the index of next waypoint
    # waypoint_index = self.get_closest_waypoint_index_3D(location,
    # next_waypoint.location)
    # # find more waypoints index to fit a polynomial
    # waypoint_index_shifted = waypoint_index - 2
    # indeces = waypoint_index_shifted + self.steps_poly * np.arange(
    # self.poly_degree + 1)
    # indeces = indeces % self.track_DF.shape[0]
    # # get waypoints for polynomial fitting
    # pts = np.array([[self.track_DF.iloc[i][0], self.track_DF.iloc[i][
    # 1]] for i in indeces])

    ### 2D ###
    index_2D = self.get_closest_waypoint_index_2D(location,
                                                  next_waypoint.location)
    index_2D_shifted = index_2D - 5
    indeces_2D = index_2D_shifted + self.steps_poly * np.arange(
        self.poly_degree + 1)
    indeces_2D = indeces_2D % self.pts_2D.shape[0]
    pts = self.pts_2D[indeces_2D]

    # self.logger.debug(f'\nwaypoint index:\n  {index_2D}')
    # self.logger.debug(f'\nindeces:\n  {indeces_2D}')

    # transform waypoints from world to car coorinate
    pts_car = VehicleMPCController.transform_into_cars_coordinate_system(
        pts,
        x,
        y,
        cos_ψ,
        sin_ψ
    )
    # fit the polynomial
    poly = np.polyfit(pts_car[:, 0], pts_car[:, 1], self.poly_degree)

    # Debug
    # self.logger.debug(f'\nwaypoint index:\n  {waypoint_index}')
    # self.logger.debug(f'\nindeces:\n  {indeces}')
    # self.logger.debug(f'\npts for poly_fit:\n  {pts}')
    # self.logger.debug(f'\npts_car:\n  {pts_car}')

    ###########

    cte = poly[-1]
     = -np.arctan(poly[-2])

    init = (0, 0, 0, v, cte, , *poly)
    self.state0 = self.get_state0(v, cte, , self.steer, self.throttle,
                                  poly)
    result = self.minimize_cost(self.bounds, self.state0, init)

    # self.steer = -0.6 * cte - 5.5 * (cte - self.prev_cte)
    # self.prev_cte = cte
    # self.throttle = VehicleMPCController.clip_throttle(self.throttle,
    # v, self.target_speed)

    control = VehicleControl()
    if 'success' in result.message:
        self.steer = result.x[-self.steps_ahead]
        self.throttle = result.x[-2 * self.steps_ahead]
    else:
        self.logger.debug('Unsuccessful optimization')

    control.steering = self.steer
    control.throttle = self.throttle

    return control

transform_into_cars_coordinate_system(pts, x, y, cos_ψ, sin_ψ) staticmethod ¤

Source code in ROAR/control_module/mpc_controller.py
@staticmethod
def transform_into_cars_coordinate_system(pts, x, y, cos_ψ, sin_ψ):
    diff = (pts - [x, y])
    pts_car = np.zeros_like(diff)
    pts_car[:, 0] = cos_ψ * diff[:, 0] + sin_ψ * diff[:, 1]
    pts_car[:, 1] = sin_ψ * diff[:, 0] - cos_ψ * diff[:, 1]
    return pts_car

LatPIDController ¤

__init__(self, agent, config, steering_boundary, dt=0.03, **kwargs) special ¤

Source code in ROAR/control_module/pid_controller.py
def __init__(self, agent, config: dict, steering_boundary: Tuple[float, float],
             dt: float = 0.03, **kwargs):
    super().__init__(agent, **kwargs)
    self.config = config
    self.steering_boundary = steering_boundary
    self._error_buffer = deque(maxlen=10)
    self._dt = dt

run_in_series(self, next_waypoint, **kwargs) ¤

Calculates a vector that represent where you are going.

Parameters:

Name Type Description Default
next_waypoint Transform required
**kwargs {}

Returns:

Type Description
float

lat_control

Source code in ROAR/control_module/pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> float:
    """
    Calculates a vector that represent where you are going.
    Args:
        next_waypoint ():
        **kwargs ():

    Returns:
        lat_control
    """
    # calculate a vector that represent where you are going
    v_begin = self.agent.vehicle.transform.location.to_array()
    direction_vector = np.array([-np.sin(np.deg2rad(self.agent.vehicle.transform.rotation.yaw)),
                                 0,
                                 -np.cos(np.deg2rad(self.agent.vehicle.transform.rotation.yaw))])
    v_end = v_begin + direction_vector

    v_vec = np.array([(v_end[0] - v_begin[0]), 0, (v_end[2] - v_begin[2])])
    # calculate error projection
    w_vec = np.array(
        [
            next_waypoint.location.x - v_begin[0],
            0,
            next_waypoint.location.z - v_begin[2],
        ]
    )

    v_vec_normed = v_vec / np.linalg.norm(v_vec)
    w_vec_normed = w_vec / np.linalg.norm(w_vec)
    error = np.arccos(v_vec_normed @ w_vec_normed.T)
    _cross = np.cross(v_vec_normed, w_vec_normed)

    if _cross[1] > 0:
        error *= -1
    self._error_buffer.append(error)
    if len(self._error_buffer) >= 2:
        _de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt
        _ie = sum(self._error_buffer) * self._dt
    else:
        _de = 0.0
        _ie = 0.0

    k_p, k_d, k_i = PIDController.find_k_values(config=self.config, vehicle=self.agent.vehicle)

    lat_control = float(
        np.clip((k_p * error) + (k_d * _de) + (k_i * _ie), self.steering_boundary[0], self.steering_boundary[1])
    )
    return lat_control

LongPIDController ¤

__init__(self, agent, config, throttle_boundary, max_speed, dt=0.03, **kwargs) special ¤

Source code in ROAR/control_module/pid_controller.py
def __init__(self, agent, config: dict, throttle_boundary: Tuple[float, float], max_speed: float,
             dt: float = 0.03, **kwargs):
    super().__init__(agent, **kwargs)
    self.config = config
    self.max_speed = max_speed
    self.throttle_boundary = throttle_boundary
    self._error_buffer = deque(maxlen=10)

    self._dt = dt

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
float

VehicleControl

Source code in ROAR/control_module/pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> float:
    target_speed = min(self.max_speed, kwargs.get("target_speed", self.max_speed))
    current_speed = Vehicle.get_speed(self.agent.vehicle)

    k_p, k_d, k_i = PIDController.find_k_values(vehicle=self.agent.vehicle, config=self.config)
    error = target_speed - current_speed

    self._error_buffer.append(error)

    if len(self._error_buffer) >= 2:
        # print(self._error_buffer[-1], self._error_buffer[-2])
        _de = (self._error_buffer[-2] - self._error_buffer[-1]) / self._dt
        _ie = sum(self._error_buffer) * self._dt
    else:
        _de = 0.0
        _ie = 0.0
    output = float(np.clip((k_p * error) + (k_d * _de) + (k_i * _ie), self.throttle_boundary[0],
                           self.throttle_boundary[1]))
    # self.logger.debug(f"curr_speed: {round(current_speed, 2)} | kp: {round(k_p, 2)} | kd: {k_d} | ki = {k_i} | "
    #       f"err = {round(error, 2)} | de = {round(_de, 2)} | ie = {round(_ie, 2)}")
          # f"self._error_buffer[-1] {self._error_buffer[-1]} | self._error_buffer[-2] = {self._error_buffer[-2]}")
    return output

PIDController ¤

__init__(self, agent, steering_boundary, throttle_boundary, **kwargs) special ¤

Source code in ROAR/control_module/pid_controller.py
def __init__(self, agent, steering_boundary: Tuple[float, float],
             throttle_boundary: Tuple[float, float], **kwargs):
    super().__init__(agent, **kwargs)
    self.max_speed = self.agent.agent_settings.max_speed
    self.throttle_boundary = throttle_boundary
    self.steering_boundary = steering_boundary
    self.config = json.load(Path(agent.agent_settings.pid_config_file_path).open(mode='r'))
    self.long_pid_controller = LongPIDController(agent=agent,
                                                 throttle_boundary=throttle_boundary,
                                                 max_speed=self.max_speed,
                                                 config=self.config["longitudinal_controller"])
    self.lat_pid_controller = LatPIDController(
        agent=agent,
        config=self.config["latitudinal_controller"],
        steering_boundary=steering_boundary
    )
    self.logger = logging.getLogger(__name__)

find_k_values(vehicle, config) staticmethod ¤

Source code in ROAR/control_module/pid_controller.py
@staticmethod
def find_k_values(vehicle: Vehicle, config: dict) -> np.array:
    current_speed = Vehicle.get_speed(vehicle=vehicle)
    k_p, k_d, k_i = 1, 0, 0
    for speed_upper_bound, kvalues in config.items():
        speed_upper_bound = float(speed_upper_bound)
        if current_speed < speed_upper_bound:
            k_p, k_d, k_i = kvalues["Kp"], kvalues["Kd"], kvalues["Ki"]
            break
    return np.clip([k_p, k_d, k_i], a_min=0, a_max=1)

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
VehicleControl

VehicleControl

Source code in ROAR/control_module/pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> VehicleControl:
    throttle = self.long_pid_controller.run_in_series(next_waypoint=next_waypoint,
                                                      target_speed=kwargs.get("target_speed", self.max_speed))
    steering = self.lat_pid_controller.run_in_series(next_waypoint=next_waypoint)
    return VehicleControl(throttle=throttle, steering=steering)

LatitunalPurePursuitController ¤

__init__(self, agent, look_ahead_gain, look_ahead_distance) special ¤

Source code in ROAR/control_module/pure_pursuit_control.py
def __init__(
        self, agent: Agent, look_ahead_gain: float,
        look_ahead_distance: float
):
    self.agent = agent
    self.look_ahead_gain = look_ahead_gain
    self.look_ahead_distance = look_ahead_distance

run_step(self, next_waypoint) ¤

Parameters:

Name Type Description Default
next_waypoint Transform required

Returns:

Type Description
float

VehicleControl.clamp

Source code in ROAR/control_module/pure_pursuit_control.py
def run_step(self, next_waypoint: Transform) -> float:
    """

    Args:
        next_waypoint ():

    Returns:
        VehicleControl.clamp
    """
    target_z = next_waypoint.location.z
    target_x = next_waypoint.location.x
    angle_difference = math.atan2(
        target_z - self.agent.vehicle.transform.location.z,
        target_x - self.agent.vehicle.transform.location.x
    ) - np.radians(self.agent.vehicle.transform.rotation.pitch)
    curr_look_forward = (
            self.look_ahead_gain * Vehicle.get_speed(vehicle=self.agent.vehicle)
            + self.look_ahead_distance
    )
    lateral_difference = math.atan2(
        2.0 * self.agent.vehicle.wheel_base * math.sin(angle_difference) / curr_look_forward,
        1.0,
    )
    return VehicleControl.clamp(lateral_difference, -1, 1)

LongitunalPurePursuitController ¤

__init__(self, agent, target_speed=60, kp=0.1) special ¤

Source code in ROAR/control_module/pure_pursuit_control.py
def __init__(self, agent: Agent, target_speed=60, kp=0.1):
    self.agent = agent
    self.target_speed = target_speed
    self.kp = kp

run_step(self) ¤

Source code in ROAR/control_module/pure_pursuit_control.py
def run_step(self) -> float:
    return float(
        VehicleControl.clamp(
            self.kp * (self.target_speed - Vehicle.get_speed(self.agent.vehicle)), 0,
            1
        )
    )

PurePursuitController ¤

__init__(self, agent, look_ahead_gain=0.1, look_ahead_distance=2, target_speed=60) special ¤

Parameters:

Name Type Description Default
vehicle

Vehicle information

required
look_ahead_gain float

Look ahead factor

0.1
look_ahead_distance float

look ahead distance

2
target_speed

desired longitudinal speed to maintain

60
Source code in ROAR/control_module/pure_pursuit_control.py
def __init__(
        self,
        agent: Agent,
        look_ahead_gain: float = 0.1,
        look_ahead_distance: float = 2,
        target_speed=60,
):
    """

    Args:
        vehicle: Vehicle information
        look_ahead_gain: Look ahead factor
        look_ahead_distance: look ahead distance
        target_speed: desired longitudinal speed to maintain
    """

    super(PurePursuitController, self).__init__(agent=agent)
    self.target_speed = self.agent.agent_config.max_speed \
        if self.agent.agent_config.max_speed else target_speed
    self.look_ahead_gain = look_ahead_gain
    self.look_ahead_distance = look_ahead_distance
    self.latitunal_controller = LatitunalPurePursuitController(
        agent=self.agent,
        look_ahead_gain=look_ahead_gain,
        look_ahead_distance=look_ahead_distance,
    )
    self.longitunal_controller = LongitunalPurePursuitController(
        agent=self.agent, target_speed=target_speed
    )

run_in_series(self, next_waypoint, **kwargs) ¤

run one step of Pure Pursuit Control

Parameters:

Name Type Description Default
vehicle

current vehicle state

required
next_waypoint Transform

Next waypoint, Transform

required
**kwargs {}

Returns:

Type Description
VehicleControl

Vehicle Control

Source code in ROAR/control_module/pure_pursuit_control.py
def run_in_series(
        self, next_waypoint: Transform, **kwargs
) -> VehicleControl:
    """
    run one step of Pure Pursuit Control

    Args:
        vehicle: current vehicle state
        next_waypoint: Next waypoint, Transform
        **kwargs:

    Returns:
        Vehicle Control

    """
    control = VehicleControl(
        throttle=self.longitunal_controller.run_step(),
        steering=self.latitunal_controller.run_step(next_waypoint=next_waypoint),
    )
    return control

LatPIDController ¤

__init__(self, agent, steering_boundary, dt=0.03, **kwargs) special ¤

Source code in ROAR/control_module/rl_pid_controller.py
def __init__(self, agent, steering_boundary: Tuple[float, float],
             dt: float = 0.03, **kwargs):
    super().__init__(agent, **kwargs)
    self.steering_boundary = steering_boundary
    self._error_buffer = deque(maxlen=10)
    self._dt = dt

find_k_values(self) ¤

Source code in ROAR/control_module/rl_pid_controller.py
def find_k_values(self) -> Tuple[float, float, float]:
    k_p = self.agent.kwargs.get("lat_k_p", 1)
    k_d = self.agent.kwargs.get("lat_k_d", 0)
    k_i = self.agent.kwargs.get("lat_k_i", 0)
    return k_p, k_d, k_i

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
float

VehicleControl

Source code in ROAR/control_module/rl_pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> float:
    # calculate a vector that represent where you are going
    v_begin = self.agent.vehicle.transform.location
    v_end = v_begin + Location(
        x=math.cos(math.radians(self.agent.vehicle.transform.rotation.pitch)),
        y=0,
        z=math.sin(math.radians(self.agent.vehicle.transform.rotation.pitch)),
    )
    v_vec = np.array([v_end.x - v_begin.x, 0, v_end.z - v_begin.z])
    # calculate error projection
    w_vec = np.array(
        [
            next_waypoint.location.x - v_begin.x,
            0,
            next_waypoint.location.z - v_begin.z,
        ]
    )
    _dot = math.acos(
        np.clip(
            np.dot(v_vec, w_vec) / (np.linalg.norm(w_vec) * np.linalg.norm(v_vec)),
            -1.0,
            1.0,
        )
    )
    _cross = np.cross(v_vec, w_vec)
    if _cross[1] > 0:
        _dot *= -1
    self._error_buffer.append(_dot)
    if len(self._error_buffer) >= 2:
        _de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt
        _ie = sum(self._error_buffer) * self._dt
    else:
        _de = 0.0
        _ie = 0.0

    k_p, k_d, k_i = self.find_k_values()

    lat_control = float(
        np.clip((k_p * _dot) + (k_d * _de) + (k_i * _ie), self.steering_boundary[0], self.steering_boundary[1])
    )
    return lat_control

LongPIDController ¤

__init__(self, agent, throttle_boundary, max_speed, dt=0.03, **kwargs) special ¤

Source code in ROAR/control_module/rl_pid_controller.py
def __init__(self, agent, throttle_boundary: Tuple[float, float], max_speed: float,
             dt: float = 0.03, **kwargs):
    super().__init__(agent, **kwargs)
    self.max_speed = max_speed
    self.throttle_boundary = throttle_boundary
    self._error_buffer = deque(maxlen=10)

    self._dt = dt

find_k_values(self) ¤

Source code in ROAR/control_module/rl_pid_controller.py
def find_k_values(self) -> Tuple[float, float, float]:
    k_p = self.agent.kwargs.get("long_k_p", 1)
    k_d = self.agent.kwargs.get("long_k_d", 0)
    k_i = self.agent.kwargs.get("long_k_i", 0)
    return k_p, k_d, k_i

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
float

VehicleControl

Source code in ROAR/control_module/rl_pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> float:
    target_speed = min(self.max_speed, self.agent.kwargs.get("target_speed", self.max_speed))
    self.logger.debug(f"Target_Speed: {target_speed} | max_speed = {self.max_speed}")
    current_speed = Vehicle.get_speed(self.agent.vehicle)

    k_p, k_d, k_i = self.find_k_values()
    error = target_speed - current_speed

    self._error_buffer.append(error)

    if len(self._error_buffer) >= 2:
        # print(self._error_buffer[-1], self._error_buffer[-2])
        _de = (self._error_buffer[-2] - self._error_buffer[-1]) / self._dt
        _ie = sum(self._error_buffer) * self._dt
    else:
        _de = 0.0
        _ie = 0.0
    output = float(np.clip((k_p * error) + (k_d * _de) + (k_i * _ie), self.throttle_boundary[0],
                           self.throttle_boundary[1]))
    # self.logger.debug(f"curr_speed: {round(current_speed, 2)} | kp: {round(k_p, 2)} | kd: {k_d} | ki = {k_i} | "
    #       f"err = {round(error, 2)} | de = {round(_de, 2)} | ie = {round(_ie, 2)}")
    # f"self._error_buffer[-1] {self._error_buffer[-1]} | self._error_buffer[-2] = {self._error_buffer[-2]}")
    return output

PIDController ¤

__init__(self, agent, steering_boundary, throttle_boundary, **kwargs) special ¤

Source code in ROAR/control_module/rl_pid_controller.py
def __init__(self, agent: Agent, steering_boundary: Tuple[float, float],
             throttle_boundary: Tuple[float, float], **kwargs):
    super().__init__(agent, **kwargs)
    self.max_speed = self.agent.agent_settings.max_speed
    self.throttle_boundary = throttle_boundary
    self.steering_boundary = steering_boundary
    self.long_pid_controller = LongPIDController(agent=agent,
                                                 throttle_boundary=throttle_boundary,
                                                 max_speed=self.max_speed)
    self.lat_pid_controller = LatPIDController(
        agent=agent,
        steering_boundary=steering_boundary
    )
    self.logger = logging.getLogger(__name__)

run_in_series(self, next_waypoint, **kwargs) ¤

Abstract function for run step

Parameters:

Name Type Description Default
next_waypoint Transform

next waypoint

required
**kwargs {}

Returns:

Type Description
VehicleControl

VehicleControl

Source code in ROAR/control_module/rl_pid_controller.py
def run_in_series(self, next_waypoint: Transform, **kwargs) -> VehicleControl:
    throttle = self.long_pid_controller.run_in_series(next_waypoint=next_waypoint)
    steering = self.lat_pid_controller.run_in_series(next_waypoint=next_waypoint)
    return VehicleControl(throttle=throttle, steering=steering)