import numpy as np import open3d as o3d import torch import trimesh from scipy.spatial import cKDTree from utils.pose_util import PoseUtil class PtsUtil: @staticmethod def voxel_downsample_point_cloud(point_cloud, voxel_size=0.005): o3d_pc = o3d.geometry.PointCloud() o3d_pc.points = o3d.utility.Vector3dVector(point_cloud) downsampled_pc = o3d_pc.voxel_down_sample(voxel_size) return np.asarray(downsampled_pc.points) @staticmethod def random_downsample_point_cloud(point_cloud, num_points, require_idx=False): if point_cloud.shape[0] == 0: if require_idx: return point_cloud, np.array([]) return point_cloud idx = np.random.choice(len(point_cloud), num_points, replace=True) if require_idx: return point_cloud[idx], idx return point_cloud[idx] @staticmethod def fps_downsample_point_cloud(point_cloud, num_points, require_idx=False): N = point_cloud.shape[0] mask = np.zeros(N, dtype=bool) sampled_indices = np.zeros(num_points, dtype=int) sampled_indices[0] = np.random.randint(0, N) distances = np.linalg.norm( point_cloud - point_cloud[sampled_indices[0]], axis=1 ) for i in range(1, num_points): farthest_index = np.argmax(distances) sampled_indices[i] = farthest_index mask[farthest_index] = True new_distances = np.linalg.norm( point_cloud - point_cloud[farthest_index], axis=1 ) distances = np.minimum(distances, new_distances) sampled_points = point_cloud[sampled_indices] if require_idx: return sampled_points, sampled_indices return sampled_points @staticmethod def random_downsample_point_cloud_tensor(point_cloud, num_points): idx = torch.randint(0, len(point_cloud), (num_points,)) return point_cloud[idx] @staticmethod def voxelize_points(points, voxel_size): voxel_indices = np.floor(points / voxel_size).astype(np.int32) unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True) return unique_voxels @staticmethod def transform_point_cloud(points, pose_mat): points_h = np.concatenate([points, np.ones((points.shape[0], 1))], axis=1) points_h = np.dot(pose_mat, points_h.T).T return points_h[:, :3] @staticmethod def get_overlapping_points( point_cloud_L, point_cloud_R, voxel_size=0.005, require_idx=False ): voxels_L, indices_L = PtsUtil.voxelize_points(point_cloud_L, voxel_size) voxels_R, _ = PtsUtil.voxelize_points(point_cloud_R, voxel_size) voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3) voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3) overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R) mask_L = np.isin( indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0] ) overlapping_points = point_cloud_L[mask_L] if require_idx: return overlapping_points, mask_L return overlapping_points @staticmethod def filter_points( points, points_normals, cam_pose, voxel_size=0.002, theta=45, z_range=(0.25, 0.5), ): """filter with z range""" points_cam = PtsUtil.transform_point_cloud(points, np.linalg.inv(cam_pose)) idx = (points_cam[:, 2] > z_range[0]) & (points_cam[:, 2] < z_range[1]) z_filtered_points = points[idx] """ filter with normal """ sampled_points = PtsUtil.voxel_downsample_point_cloud( z_filtered_points, voxel_size ) kdtree = cKDTree(points_normals[:, :3]) _, indices = kdtree.query(sampled_points) nearest_points = points_normals[indices] normals = nearest_points[:, 3:] camera_axis = -cam_pose[:3, 2] normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True) cos_theta = np.dot(normals_normalized, camera_axis) theta_rad = np.deg2rad(theta) idx = cos_theta > np.cos(theta_rad) filtered_sampled_points = sampled_points[idx] return filtered_sampled_points[:, :3] @staticmethod def multi_scale_icp( source, target, voxel_size_range, init_transformation=None, steps=20 ): pipreg = o3d.pipelines.registration if init_transformation is not None: current_transformation = init_transformation else: current_transformation = np.identity(4) cnt = 0 best_score = 1e10 best_reg = None voxel_sizes = [] for i in range(steps): voxel_sizes.append( voxel_size_range[0] + i * (voxel_size_range[1] - voxel_size_range[0]) / steps ) for voxel_size in voxel_sizes: radius_normal = voxel_size * 2 source_downsampled = source.voxel_down_sample(voxel_size) source_downsampled.estimate_normals( search_param=o3d.geometry.KDTreeSearchParamHybrid( radius=radius_normal, max_nn=30 ) ) target_downsampled = target.voxel_down_sample(voxel_size) target_downsampled.estimate_normals( search_param=o3d.geometry.KDTreeSearchParamHybrid( radius=radius_normal, max_nn=30 ) ) reg_icp = pipreg.registration_icp( source_downsampled, target_downsampled, voxel_size * 2, current_transformation, pipreg.TransformationEstimationPointToPlane(), pipreg.ICPConvergenceCriteria(max_iteration=500), ) cnt += 1 if reg_icp.fitness == 0: score = 1e10 else: score = reg_icp.inlier_rmse / reg_icp.fitness if score < best_score: best_score = score best_reg = reg_icp return best_reg, best_score @staticmethod def multi_scale_ransac(source_downsampled, target_downsampled, source_fpfh, model_fpfh, voxel_size_range, steps=20): pipreg = o3d.pipelines.registration cnt = 0 best_score = 1e10 best_reg = None voxel_sizes = [] for i in range(steps): voxel_sizes.append( voxel_size_range[0] + i * (voxel_size_range[1] - voxel_size_range[0]) / steps ) for voxel_size in voxel_sizes: reg_ransac = pipreg.registration_ransac_based_on_feature_matching( source_downsampled, target_downsampled, source_fpfh, model_fpfh, mutual_filter=True, max_correspondence_distance=voxel_size*2, estimation_method=pipreg.TransformationEstimationPointToPoint(False), ransac_n=4, checkers=[pipreg.CorrespondenceCheckerBasedOnEdgeLength(0.9)], criteria=pipreg.RANSACConvergenceCriteria(8000000, 500), ) cnt += 1 if reg_ransac.fitness == 0: score = 1e10 else: score = reg_ransac.inlier_rmse / reg_ransac.fitness if score < best_score: best_score = score best_reg = reg_ransac return best_reg, best_score @staticmethod def register(pcl: np.ndarray, model: trimesh.Trimesh, voxel_size=0.01): radius_normal = voxel_size * 2 pipreg = o3d.pipelines.registration model_pcd = o3d.geometry.PointCloud() model_pcd.points = o3d.utility.Vector3dVector(model.vertices) model_downsampled = model_pcd.voxel_down_sample(voxel_size) model_downsampled.estimate_normals( search_param=o3d.geometry.KDTreeSearchParamHybrid( radius=radius_normal, max_nn=30 ) ) model_fpfh = pipreg.compute_fpfh_feature( model_downsampled, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100), ) source_pcd = o3d.geometry.PointCloud() source_pcd.points = o3d.utility.Vector3dVector(pcl) source_downsampled = source_pcd.voxel_down_sample(voxel_size) source_downsampled.estimate_normals( search_param=o3d.geometry.KDTreeSearchParamHybrid( radius=radius_normal, max_nn=30 ) ) source_fpfh = pipreg.compute_fpfh_feature( source_downsampled, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100), ) reg_ransac, ransac_best_score = PtsUtil.multi_scale_ransac( source_downsampled, model_downsampled, source_fpfh, model_fpfh, voxel_size_range=(0.03, 0.005), steps=3, ) reg_icp, icp_best_score = PtsUtil.multi_scale_icp( source_downsampled, model_downsampled, voxel_size_range=(0.02, 0.001), init_transformation=reg_ransac.transformation, steps=50, ) return reg_icp.transformation @staticmethod def get_pts_from_depth(depth, cam_intrinsic, cam_extrinsic): h, w = depth.shape i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy") z = depth x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0] y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1] points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) mask = mask.reshape(-1, 4) points_camera = np.concatenate( [points_camera, np.ones((points_camera.shape[0], 1))], axis=-1 ) points_world = np.dot(cam_extrinsic, points_camera.T).T[:, :3] data = { "points_world": points_world, "points_camera": points_camera, } return data