Skip to content

Perception Module

DepthToPointCloudDetector ¤

__init__(self, agent, should_compute_global_pointcloud=False, should_sample_points=False, should_filter_by_distance=False, max_detectable_distance=1, scale_factor=1000, max_points_to_convert=10000, **kwargs) special ¤

Source code in ROAR/perception_module/depth_to_pointcloud_detector.py
def __init__(self,
             agent: Agent,
             should_compute_global_pointcloud: bool = False,
             should_sample_points: bool = False,
             should_filter_by_distance: float = False,
             max_detectable_distance: float = 1,
             scale_factor: int = 1000,
             max_points_to_convert=10000, **kwargs):
    super().__init__(agent, **kwargs)
    self.should_compute_global_pointcloud = should_compute_global_pointcloud
    self.should_sample_points = should_sample_points
    self.should_filter_by_distance = should_filter_by_distance
    self.max_detectable_distance = max_detectable_distance
    self.max_points_to_convert = max_points_to_convert
    self.scale_factor = scale_factor

find_fps(t1, t2) staticmethod ¤

Source code in ROAR/perception_module/depth_to_pointcloud_detector.py
@staticmethod
def find_fps(t1, t2):
    return 1 / (t2 - t1)

run_in_series(self) ¤

:return: 3 x N array of point cloud

Source code in ROAR/perception_module/depth_to_pointcloud_detector.py
def run_in_series(self) -> Optional[np.ndarray]:
    """

    :return: 3 x N array of point cloud
    """
    if self.agent.front_depth_camera.data is not None:
        # depth_img = self.agent.front_depth_camera.data.copy()
        # pixel_length = self.agent.front_depth_camera.image_size_x * self.agent.front_depth_camera.image_size_y
        # u_coord = repmat(np.r_[self.agent.front_depth_camera.image_size_x - 1:-1:-1],
        #                  self.agent.front_depth_camera.image_size_y, 1).reshape(pixel_length)
        # v_coord = repmat(np.c_[self.agent.front_depth_camera.image_size_y - 1:-1:-1],
        #                  1, self.agent.front_depth_camera.image_size_x).reshape(pixel_length)
        #
        # normalized_depth = np.reshape(depth_img, pixel_length)
        # p2d = np.array([u_coord, v_coord, np.ones_like(u_coord)])
        # p3d = np.dot(np.linalg.inv(self.agent.front_depth_camera.intrinsics_matrix), p2d)
        # return p3d
        # p3d *= normalized_depth * 1000
        # return p3d
        depth_img = self.agent.front_depth_camera.data.copy()
        coords = np.where(depth_img < 10)  # it will just return all coordinate pairs
        raw_p2d = np.reshape(self._pix2xyz(depth_img=depth_img, i=coords[0], j=coords[1]),
                             (3, np.shape(coords)[1])).T  # N x 3

        cords_y_minus_z_x = np.linalg.inv(self.agent.front_depth_camera.intrinsics_matrix) @ raw_p2d.T
        cords_xyz_1 = np.vstack([
            cords_y_minus_z_x[0, :],
            -cords_y_minus_z_x[1, :],
            -cords_y_minus_z_x[2, :],
            np.ones((1, np.shape(cords_y_minus_z_x)[1]))
        ])
        points = self.agent.vehicle.transform.get_matrix() @ cords_xyz_1
        points = points.T[:, :3]
        return points
    return None

run_in_threaded(self, **kwargs) ¤

This is the threaded function.

Parameters:

Name Type Description Default
**kwargs {}
Source code in ROAR/perception_module/depth_to_pointcloud_detector.py
def run_in_threaded(self, **kwargs):
    while True:
        self.agent.kwargs["point_cloud"] = self.run_in_series()

Detector ¤

__init__(self, agent, **kwargs) special ¤

Source code in ROAR/perception_module/detector.py
def __init__(self, agent: Agent, **kwargs):
    super().__init__(**kwargs)
    self.agent = agent
    self.logger = logging.getLogger("Base Detector")

GroundPlaneDetector ¤

__init__(self, agent, knn=200, res=4, **kwargs) special ¤

Source code in ROAR/perception_module/ground_plane_detector.py
def __init__(self, agent: Agent, knn: int = 200, res: int = 4, **kwargs):
    super().__init__(agent, **kwargs)
    self.reference_norm: Optional[np.ndarray] = np.array([-0.00000283, -0.00012446, 0.99999999])
    self.knn = knn
    self.res = res
    self.f1, self.f2, self.f3, self.f4 = self.compute_vectors_near_me(res)
    self.threshold = 0.15

compute_reference_norm(self, pcd) ¤

Source code in ROAR/perception_module/ground_plane_detector.py
def compute_reference_norm(self, pcd: o3d.geometry.PointCloud):
    pcd_tree = o3d.geometry.KDTreeFlann(pcd)  # build KD tree for fast computation
    [k, idx, _] = pcd_tree.search_knn_vector_3d(self.agent.vehicle.transform.location.to_array(),
                                                knn=self.knn)  # find points around me
    points_near_me = np.asarray(pcd.points)[idx, :]  # 200 x 3
    u, s, vh = np.linalg.svd(points_near_me, full_matrices=False)  # use svd to find normals of points
    self.reference_norm = vh[2, :]

compute_vectors_near_me(self, res) ¤

Source code in ROAR/perception_module/ground_plane_detector.py
def compute_vectors_near_me(self, res):
    d1, d2 = self.agent.front_depth_camera.image_size_y, self.agent.front_depth_camera.image_size_x
    idx, jdx = np.indices((d1, d2))
    idx_back = np.clip(idx - 1, 0, idx.max()).flatten()
    idx_front = np.clip(idx + 1, 0, idx.max()).flatten()
    jdx_back = np.clip(jdx - 1, 0, jdx.max()).flatten()
    jdx_front = np.clip(jdx + 1, 0, jdx.max()).flatten()
    idx = idx.flatten()
    jdx = jdx.flatten()

    # rand_idx = np.random.choice(np.arange(idx.shape[0]), size=d1*d2, replace=False)
    f1 = (idx_front * d2 + jdx)[::res**2]  # [rand_idx]
    f2 = (idx_back * d2 + jdx)[::res**2]  # [rand_idx]
    f3 = (idx * d2 + jdx_front)[::res**2]  # [rand_idx]
    f4 = (idx * d2 + jdx_back)[::res**2]  # [rand_idx]
    return f1, f2, f3, f4

construct_pointcloud(points) staticmethod ¤

Source code in ROAR/perception_module/ground_plane_detector.py
@staticmethod
def construct_pointcloud(points) -> o3d.geometry.PointCloud:
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.estimate_normals()
    return pcd

normalize_v3(arr) staticmethod ¤

Source code in ROAR/perception_module/ground_plane_detector.py
@staticmethod
def normalize_v3(arr):
    lens = np.sqrt(arr[:, 0] ** 2 + arr[:, 1] ** 2 + arr[:, 2] ** 2)
    lens[lens <= 0] = 1
    arr[:, 0] /= lens
    arr[:, 1] /= lens
    arr[:, 2] /= lens
    return arr

run_in_series(self) ¤

:return: 3 x N array of point cloud

Source code in ROAR/perception_module/ground_plane_detector.py
def run_in_series(self) -> Any:
    if self.agent.kwargs.get("point_cloud", None) is not None:
        try:
            points: np.ndarray = self.agent.kwargs.get("point_cloud").copy()
            x = points[self.f3, :] - points[self.f4, :]
            y = points[self.f1, :] - points[self.f2, :]
            normals = self.normalize_v3(np.cross(x, y))
            # OpenCV FloodFill
            d1 = h = self.agent.front_depth_camera.image_size_y
            d2 = w = self.agent.front_depth_camera.image_size_x
            curr_img = normals.reshape((int(d1/self.res), int(d2/self.res), 3)).astype(np.float32)

            min_x, max_x = 0, h // self.res
            min_y, max_y = w * 3 // 4 // self.res, w

            # Y_norm_array: np.ndarray = curr_img[min_x:max_x, min_y:max_y, 1]
            # x, y = np.unravel_index(np.argmax(Y_norm_array), np.shape(Y_norm_array))
            # seed_w, seed_h = y + min_y, x + min_x
            # print(seed_w, seed_h, np.shape(curr_img))

            seed_point = (int(d1/self.res) - 10, int(int(d2/self.res) / 2))
            _, retval, _, _ = cv2.floodFill(image=curr_img,
                                            seedPoint=seed_point,
                                            newVal=(0, 0, 0),
                                            loDiff=(self.threshold,self.threshold,self.threshold),
                                            upDiff=(self.threshold,self.threshold,self.threshold),
                                            mask=None,
                                            flags=8)
            bool_matrix = np.mean(retval, axis=2) == 0
            bool_zeros = np.zeros(d1 * d2).flatten()
            bool_indices = np.indices(bool_zeros.shape)[0][::self.res**2]
            bool_zeros[bool_indices] = bool_matrix.flatten()
            bool_matrix = bool_zeros.reshape((d1, d2))

            color_image = self.agent.front_rgb_camera.data.copy()
            color_image[bool_matrix > 0] = 255
            cv2.imshow('Color', color_image)
            cv2.waitKey(1)
        except Exception as e:
            self.logger.error(e)