279 lines
10 KiB
Python
279 lines
10 KiB
Python
import numpy as np
|
|
import open3d as o3d
|
|
import torch
|
|
import trimesh
|
|
from scipy.spatial import cKDTree
|
|
from utils.pose_util import PoseUtil
|
|
|
|
|
|
class PtsUtil:
|
|
|
|
@staticmethod
|
|
def voxel_downsample_point_cloud(point_cloud, voxel_size=0.005):
|
|
o3d_pc = o3d.geometry.PointCloud()
|
|
o3d_pc.points = o3d.utility.Vector3dVector(point_cloud)
|
|
downsampled_pc = o3d_pc.voxel_down_sample(voxel_size)
|
|
return np.asarray(downsampled_pc.points)
|
|
|
|
@staticmethod
|
|
def random_downsample_point_cloud(point_cloud, num_points, require_idx=False):
|
|
if point_cloud.shape[0] == 0:
|
|
if require_idx:
|
|
return point_cloud, np.array([])
|
|
return point_cloud
|
|
idx = np.random.choice(len(point_cloud), num_points, replace=True)
|
|
if require_idx:
|
|
return point_cloud[idx], idx
|
|
return point_cloud[idx]
|
|
|
|
@staticmethod
|
|
def fps_downsample_point_cloud(point_cloud, num_points, require_idx=False):
|
|
N = point_cloud.shape[0]
|
|
mask = np.zeros(N, dtype=bool)
|
|
|
|
sampled_indices = np.zeros(num_points, dtype=int)
|
|
sampled_indices[0] = np.random.randint(0, N)
|
|
distances = np.linalg.norm(
|
|
point_cloud - point_cloud[sampled_indices[0]], axis=1
|
|
)
|
|
for i in range(1, num_points):
|
|
farthest_index = np.argmax(distances)
|
|
sampled_indices[i] = farthest_index
|
|
mask[farthest_index] = True
|
|
|
|
new_distances = np.linalg.norm(
|
|
point_cloud - point_cloud[farthest_index], axis=1
|
|
)
|
|
distances = np.minimum(distances, new_distances)
|
|
|
|
sampled_points = point_cloud[sampled_indices]
|
|
if require_idx:
|
|
return sampled_points, sampled_indices
|
|
return sampled_points
|
|
|
|
@staticmethod
|
|
def random_downsample_point_cloud_tensor(point_cloud, num_points):
|
|
idx = torch.randint(0, len(point_cloud), (num_points,))
|
|
return point_cloud[idx]
|
|
|
|
@staticmethod
|
|
def voxelize_points(points, voxel_size):
|
|
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
|
|
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
|
|
return unique_voxels
|
|
|
|
@staticmethod
|
|
def transform_point_cloud(points, pose_mat):
|
|
points_h = np.concatenate([points, np.ones((points.shape[0], 1))], axis=1)
|
|
points_h = np.dot(pose_mat, points_h.T).T
|
|
return points_h[:, :3]
|
|
|
|
@staticmethod
|
|
def get_overlapping_points(
|
|
point_cloud_L, point_cloud_R, voxel_size=0.005, require_idx=False
|
|
):
|
|
voxels_L, indices_L = PtsUtil.voxelize_points(point_cloud_L, voxel_size)
|
|
voxels_R, _ = PtsUtil.voxelize_points(point_cloud_R, voxel_size)
|
|
|
|
voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3)
|
|
voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3)
|
|
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
|
|
mask_L = np.isin(
|
|
indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]
|
|
)
|
|
overlapping_points = point_cloud_L[mask_L]
|
|
if require_idx:
|
|
return overlapping_points, mask_L
|
|
return overlapping_points
|
|
|
|
@staticmethod
|
|
def filter_points(
|
|
points,
|
|
points_normals,
|
|
cam_pose,
|
|
voxel_size=0.002,
|
|
theta=45,
|
|
z_range=(0.25, 0.5),
|
|
):
|
|
"""filter with z range"""
|
|
points_cam = PtsUtil.transform_point_cloud(points, np.linalg.inv(cam_pose))
|
|
idx = (points_cam[:, 2] > z_range[0]) & (points_cam[:, 2] < z_range[1])
|
|
z_filtered_points = points[idx]
|
|
|
|
""" filter with normal """
|
|
sampled_points = PtsUtil.voxel_downsample_point_cloud(
|
|
z_filtered_points, voxel_size
|
|
)
|
|
kdtree = cKDTree(points_normals[:, :3])
|
|
_, indices = kdtree.query(sampled_points)
|
|
nearest_points = points_normals[indices]
|
|
|
|
normals = nearest_points[:, 3:]
|
|
camera_axis = -cam_pose[:3, 2]
|
|
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
|
|
cos_theta = np.dot(normals_normalized, camera_axis)
|
|
theta_rad = np.deg2rad(theta)
|
|
idx = cos_theta > np.cos(theta_rad)
|
|
filtered_sampled_points = sampled_points[idx]
|
|
return filtered_sampled_points[:, :3]
|
|
|
|
@staticmethod
|
|
def multi_scale_icp(
|
|
source, target, voxel_size_range, init_transformation=None, steps=20
|
|
):
|
|
pipreg = o3d.pipelines.registration
|
|
|
|
if init_transformation is not None:
|
|
current_transformation = init_transformation
|
|
else:
|
|
current_transformation = np.identity(4)
|
|
cnt = 0
|
|
best_score = 1e10
|
|
best_reg = None
|
|
voxel_sizes = []
|
|
for i in range(steps):
|
|
voxel_sizes.append(
|
|
voxel_size_range[0]
|
|
+ i * (voxel_size_range[1] - voxel_size_range[0]) / steps
|
|
)
|
|
|
|
for voxel_size in voxel_sizes:
|
|
radius_normal = voxel_size * 2
|
|
source_downsampled = source.voxel_down_sample(voxel_size)
|
|
source_downsampled.estimate_normals(
|
|
search_param=o3d.geometry.KDTreeSearchParamHybrid(
|
|
radius=radius_normal, max_nn=30
|
|
)
|
|
)
|
|
target_downsampled = target.voxel_down_sample(voxel_size)
|
|
target_downsampled.estimate_normals(
|
|
search_param=o3d.geometry.KDTreeSearchParamHybrid(
|
|
radius=radius_normal, max_nn=30
|
|
)
|
|
)
|
|
|
|
reg_icp = pipreg.registration_icp(
|
|
source_downsampled,
|
|
target_downsampled,
|
|
voxel_size * 2,
|
|
current_transformation,
|
|
pipreg.TransformationEstimationPointToPlane(),
|
|
pipreg.ICPConvergenceCriteria(max_iteration=500),
|
|
)
|
|
cnt += 1
|
|
if reg_icp.fitness == 0:
|
|
score = 1e10
|
|
else:
|
|
score = reg_icp.inlier_rmse / reg_icp.fitness
|
|
|
|
if score < best_score:
|
|
best_score = score
|
|
best_reg = reg_icp
|
|
|
|
return best_reg, best_score
|
|
|
|
@staticmethod
|
|
def multi_scale_ransac(source_downsampled, target_downsampled, source_fpfh, model_fpfh, voxel_size_range, steps=20):
|
|
pipreg = o3d.pipelines.registration
|
|
cnt = 0
|
|
best_score = 1e10
|
|
best_reg = None
|
|
voxel_sizes = []
|
|
for i in range(steps):
|
|
voxel_sizes.append(
|
|
voxel_size_range[0]
|
|
+ i * (voxel_size_range[1] - voxel_size_range[0]) / steps
|
|
)
|
|
|
|
for voxel_size in voxel_sizes:
|
|
reg_ransac = pipreg.registration_ransac_based_on_feature_matching(
|
|
source_downsampled,
|
|
target_downsampled,
|
|
source_fpfh,
|
|
model_fpfh,
|
|
mutual_filter=True,
|
|
max_correspondence_distance=voxel_size*2,
|
|
estimation_method=pipreg.TransformationEstimationPointToPoint(False),
|
|
ransac_n=4,
|
|
checkers=[pipreg.CorrespondenceCheckerBasedOnEdgeLength(0.9)],
|
|
criteria=pipreg.RANSACConvergenceCriteria(8000000, 500),
|
|
)
|
|
cnt += 1
|
|
if reg_ransac.fitness == 0:
|
|
score = 1e10
|
|
else:
|
|
score = reg_ransac.inlier_rmse / reg_ransac.fitness
|
|
if score < best_score:
|
|
best_score = score
|
|
best_reg = reg_ransac
|
|
|
|
return best_reg, best_score
|
|
|
|
@staticmethod
|
|
def register(pcl: np.ndarray, model: trimesh.Trimesh, voxel_size=0.01):
|
|
radius_normal = voxel_size * 2
|
|
pipreg = o3d.pipelines.registration
|
|
model_pcd = o3d.geometry.PointCloud()
|
|
model_pcd.points = o3d.utility.Vector3dVector(model.vertices)
|
|
model_downsampled = model_pcd.voxel_down_sample(voxel_size)
|
|
model_downsampled.estimate_normals(
|
|
search_param=o3d.geometry.KDTreeSearchParamHybrid(
|
|
radius=radius_normal, max_nn=30
|
|
)
|
|
)
|
|
model_fpfh = pipreg.compute_fpfh_feature(
|
|
model_downsampled,
|
|
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100),
|
|
)
|
|
|
|
source_pcd = o3d.geometry.PointCloud()
|
|
source_pcd.points = o3d.utility.Vector3dVector(pcl)
|
|
source_downsampled = source_pcd.voxel_down_sample(voxel_size)
|
|
source_downsampled.estimate_normals(
|
|
search_param=o3d.geometry.KDTreeSearchParamHybrid(
|
|
radius=radius_normal, max_nn=30
|
|
)
|
|
)
|
|
source_fpfh = pipreg.compute_fpfh_feature(
|
|
source_downsampled,
|
|
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100),
|
|
)
|
|
|
|
reg_ransac, ransac_best_score = PtsUtil.multi_scale_ransac(
|
|
source_downsampled,
|
|
model_downsampled,
|
|
source_fpfh,
|
|
model_fpfh,
|
|
voxel_size_range=(0.03, 0.005),
|
|
steps=3,
|
|
)
|
|
reg_icp, icp_best_score = PtsUtil.multi_scale_icp(
|
|
source_downsampled,
|
|
model_downsampled,
|
|
voxel_size_range=(0.02, 0.001),
|
|
init_transformation=reg_ransac.transformation,
|
|
steps=50,
|
|
)
|
|
return reg_icp.transformation
|
|
|
|
@staticmethod
|
|
def get_pts_from_depth(depth, cam_intrinsic, cam_extrinsic):
|
|
h, w = depth.shape
|
|
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
|
|
|
|
z = depth
|
|
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
|
|
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
|
|
|
|
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
|
|
mask = mask.reshape(-1, 4)
|
|
points_camera = np.concatenate(
|
|
[points_camera, np.ones((points_camera.shape[0], 1))], axis=-1
|
|
)
|
|
|
|
points_world = np.dot(cam_extrinsic, points_camera.T).T[:, :3]
|
|
data = {
|
|
"points_world": points_world,
|
|
"points_camera": points_camera,
|
|
}
|
|
return data |