new_nbv_rec/runners/evaluate_pbnbv.py
2025-05-19 16:32:04 +08:00

787 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
from sklearn.mixture import GaussianMixture
from typing import List, Tuple, Dict
from enum import Enum
class VoxelType(Enum):
NONE = 0
OCCUPIED = 1
EMPTY = 2
UNKNOWN = 3
FRONTIER = 4
class VoxelStruct:
def __init__(self, voxel_resolution=0.01, ray_trace_step=0.01, surrounding_radius=1,
num_parallels=10, viewpoints_per_parallel=10, camera_working_distance=0.5):
self.voxel_resolution = voxel_resolution
self.ray_trace_step = ray_trace_step
self.surrounding_radius = surrounding_radius
self.num_parallels = num_parallels
self.viewpoints_per_parallel = viewpoints_per_parallel
self.camera_working_distance = camera_working_distance
self.occupied_voxels = []
self.empty_voxels = []
self.unknown_voxels = []
self.frontier_voxels = []
self.bbx_min = None
self.bbx_max = None
self.voxel_types: Dict[Tuple[float, float, float], VoxelType] = {}
def update_voxel_map(self, points: np.ndarray,
camera_pose: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]:
points = self.transform_points(points, camera_pose)
new_occupied = self.voxelize_points(points)
self.occupied_voxels.extend(new_occupied)
self.update_bounding_box()
self.ray_tracing(camera_pose[:3, 3], camera_pose[:3, :3])
self.update_frontier_voxels()
return self.frontier_voxels, self.occupied_voxels
def ray_tracing(self, camera_position: np.ndarray, camera_rotation: np.ndarray):
if self.bbx_min is None or self.bbx_max is None:
return
directions = self.generate_ray_directions()
for direction in directions:
direction_cam = camera_rotation @ direction
current_pos = camera_position.copy()
cnt = 0
while not self.is_in_bounding_box(current_pos):
current_pos -= direction_cam * self.ray_trace_step*2
cnt += 1
if cnt > 200:
break
occupied_flag = False
maybe_unknown_voxels = []
while self.is_in_bounding_box(current_pos):
voxel = self.get_voxel_coordinate(current_pos)
voxel_key = tuple(voxel)
if self.is_occupied(voxel):
current_pos -= direction_cam * self.ray_trace_step
occupied_flag = True
continue
if not occupied_flag:
if voxel_key not in self.voxel_types or self.voxel_types[voxel_key] == VoxelType.NONE or self.voxel_types[voxel_key] == VoxelType.UNKNOWN:
maybe_unknown_voxels.append(voxel)
else:
if voxel_key not in self.voxel_types or self.voxel_types[voxel_key] == VoxelType.NONE:
self.voxel_types[voxel_key] = VoxelType.UNKNOWN
self.unknown_voxels.append(voxel)
current_pos -= direction_cam * self.ray_trace_step
if not occupied_flag:
for voxel in maybe_unknown_voxels:
self.voxel_types[tuple(voxel)] = VoxelType.UNKNOWN
self.unknown_voxels.append(voxel)
else:
for voxel in maybe_unknown_voxels:
voxel_key = tuple(voxel)
if voxel_key in self.voxel_types and self.voxel_types[voxel_key] == VoxelType.UNKNOWN:
self.unknown_voxels = [v for v in self.unknown_voxels if not np.array_equal(v, voxel)]
self.voxel_types[voxel_key] = VoxelType.EMPTY
self.empty_voxels.append(voxel)
def generate_ray_directions(self):
directions = []
if self.bbx_min is not None and self.bbx_max is not None:
bbx_diagonal = np.linalg.norm(self.bbx_max - self.bbx_min)
hemisphere_radius = self.camera_working_distance + bbx_diagonal / 2
else:
hemisphere_radius = self.camera_working_distance
# 使用更密集的采样
theta_step = np.pi / (6 * self.num_parallels) # 减小theta的步长
phi_step = np.pi / (6 * self.viewpoints_per_parallel) # 减小phi的步长
# 从顶部到底部采样
for theta in np.arange(0, np.pi/6 + theta_step, theta_step):
# 在每个纬度上采样
for phi in np.arange(0, 2*np.pi, phi_step):
x = hemisphere_radius * np.sin(theta) * np.cos(phi)
y = hemisphere_radius * np.sin(theta) * np.sin(phi)
z = hemisphere_radius * np.cos(theta)
direction = np.array([-x, -y, -z])
direction = direction / np.linalg.norm(direction)
directions.append(direction)
return directions
def update_frontier_voxels(self):
self.frontier_voxels = []
remaining_unknown = []
for voxel in self.unknown_voxels:
neighbors = self.find_neighbors(voxel)
has_empty = any(self.voxel_types.get(tuple(n), VoxelType.NONE) == VoxelType.EMPTY for n in neighbors)
has_occupied = any(self.voxel_types.get(tuple(n), VoxelType.NONE) == VoxelType.OCCUPIED for n in neighbors)
if has_empty and has_occupied:
self.voxel_types[tuple(voxel)] = VoxelType.FRONTIER
self.frontier_voxels.append(voxel)
else:
remaining_unknown.append(voxel)
self.unknown_voxels = remaining_unknown
def is_in_bounding_box(self, point: np.ndarray) -> bool:
if self.bbx_min is None or self.bbx_max is None:
return False
return np.all(point >= self.bbx_min) and np.all(point <= self.bbx_max)
def get_voxel_coordinate(self, point: np.ndarray) -> np.ndarray:
return (point / self.voxel_resolution).astype(int) * self.voxel_resolution
def voxelize_points(self, points: np.ndarray) -> List[np.ndarray]:
voxel_coords = (points / self.voxel_resolution).astype(int)
unique_voxels = np.unique(voxel_coords, axis=0)
voxels = [voxel * self.voxel_resolution for voxel in unique_voxels]
for voxel in voxels:
self.voxel_types[tuple(voxel)] = VoxelType.OCCUPIED
return voxels
def is_occupied(self, voxel: np.ndarray) -> bool:
return self.voxel_types.get(tuple(voxel), VoxelType.NONE) == VoxelType.OCCUPIED
def find_neighbors(self, voxel: np.ndarray) -> List[np.ndarray]:
neighbors = []
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
for dz in [-1, 0, 1]:
if dx == 0 and dy == 0 and dz == 0:
continue
neighbor = voxel + np.array([dx, dy, dz]) * self.voxel_resolution
neighbors.append(neighbor)
return neighbors
def update_bounding_box(self):
if not self.occupied_voxels:
return
occupied_array = np.array(self.occupied_voxels)
self.bbx_min = occupied_array.min(axis=0) - 2 * self.voxel_resolution
self.bbx_max = occupied_array.max(axis=0) + 2 * self.voxel_resolution
def transform_points(self, points: np.ndarray, transform: np.ndarray) -> np.ndarray:
ones = np.ones((points.shape[0], 1))
points_homo = np.hstack((points, ones))
transformed = (transform @ points_homo.T).T
return transformed[:, :3]
def create_voxel_geometry(self,voxels, color, voxel_size):
import open3d as o3d
points = np.array(voxels)
if len(points) == 0:
return None
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)
pcd.colors = o3d.utility.Vector3dVector(np.tile(color, (len(points), 1)))
return pcd
def create_ray_geometry(self,camera_pos, directions, camera_rot, length=1.0):
import open3d as o3d
lines = []
colors = []
for direction in directions:
# 将方向向量转换到相机坐标系
direction_cam = camera_rot @ direction
end_point = camera_pos - direction_cam * length
lines.append([camera_pos, end_point])
colors.append([0.5, 0.5, 0.5]) # 灰色光线
line_set = o3d.geometry.LineSet()
line_set.points = o3d.utility.Vector3dVector(np.array(lines).reshape(-1, 3))
line_set.lines = o3d.utility.Vector2iVector(np.array([[i*2, i*2+1] for i in range(len(lines))]))
line_set.colors = o3d.utility.Vector3dVector(colors)
return line_set
def visualize_voxel_struct(self, camera_pose: np.ndarray = None):
import open3d as o3d
vis = o3d.visualization.Visualizer()
vis.create_window()
coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0])
vis.add_geometry(coordinate_frame)
# 显示已占据的体素(蓝色)
occupied_voxels = self.create_voxel_geometry(
self.occupied_voxels,
[0, 0, 1],
self.voxel_resolution
)
if occupied_voxels:
vis.add_geometry(occupied_voxels)
# 显示空体素(绿色)
empty_voxels = self.create_voxel_geometry(
self.empty_voxels,
[0, 1, 0],
self.voxel_resolution
)
if empty_voxels:
vis.add_geometry(empty_voxels)
# 显示未知体素(灰色)
unknown_voxels = self.create_voxel_geometry(
self.unknown_voxels,
[0.5, 0.5, 0.5],
self.voxel_resolution
)
if unknown_voxels:
vis.add_geometry(unknown_voxels)
# 显示frontier体素红色
frontier_voxels = self.create_voxel_geometry(
self.frontier_voxels,
[1, 0, 0],
self.voxel_resolution
)
if frontier_voxels:
vis.add_geometry(frontier_voxels)
# 显示光线
if camera_pose is not None:
directions = self.generate_ray_directions()
rays = self.create_ray_geometry(
camera_pose[:3, 3],
directions,
camera_pose[:3, :3],
length=0.5 # 光线长度
)
vis.add_geometry(rays)
opt = vis.get_render_option()
opt.background_color = np.asarray([0.8, 0.8, 0.8])
opt.point_size = 5.0
vis.run()
vis.destroy_window()
class PBNBV:
def __init__(self, voxel_resolution=0.01, camera_intrinsic=None):
self.voxel_resolution = voxel_resolution
self.voxel_struct = VoxelStruct(voxel_resolution)
self.camera_intrinsic = camera_intrinsic or np.array([
[902.14, 0, 320],
[0, 902.14, 200],
[0, 0, 1]
])
self.focal_length = (self.camera_intrinsic[0,0] + self.camera_intrinsic[1,1]) / 2 / 1000
self.ellipsoids = []
def capture(self, point_cloud: np.ndarray, camera_pose: np.ndarray):
frontier_voxels, occupied_voxels = self.voxel_struct.update_voxel_map(point_cloud, camera_pose)
# self.voxel_struct.visualize_voxel_struct(camera_pose)
self.fit_ellipsoids(frontier_voxels, occupied_voxels)
def reset(self):
self.ellipsoids = []
self.voxel_struct = VoxelStruct(self.voxel_resolution)
def fit_ellipsoids(self, frontier_voxels: List[np.ndarray], occupied_voxels: List[np.ndarray],
max_ellipsoids=10):
self.ellipsoids = []
if not frontier_voxels and not occupied_voxels:
return
if frontier_voxels:
frontier_gmm = self.fit_gmm(np.array(frontier_voxels), max_ellipsoids)
self.ellipsoids.extend(self.gmm_to_ellipsoids(frontier_gmm, "frontier"))
if occupied_voxels:
occupied_gmm = self.fit_gmm(np.array(occupied_voxels), max_ellipsoids)
self.ellipsoids.extend(self.gmm_to_ellipsoids(occupied_gmm, "occupied"))
def fit_gmm(self, data: np.ndarray, max_components: int) -> GaussianMixture:
best_gmm = None
best_bic = np.inf
for n in range(1, min(max_components, len(data)) + 1):
gmm = GaussianMixture(n_components=n, covariance_type='full')
gmm.fit(data)
bic = gmm.bic(data)
if bic < best_bic:
best_bic = bic
best_gmm = gmm
return best_gmm
def gmm_to_ellipsoids(self, gmm: GaussianMixture, ellipsoid_type: str) -> List[Dict]:
ellipsoids = []
for i in range(gmm.n_components):
mean = gmm.means_[i]
cov = gmm.covariances_[i]
eigvals, eigvecs = np.linalg.eigh(cov)
radii = np.sqrt(eigvals) * 3
rotation = eigvecs
pose = np.eye(4)
pose[:3, :3] = rotation
pose[:3, 3] = mean
ellipsoids.append({
"type": ellipsoid_type,
"pose": pose,
"radii": radii
})
return ellipsoids
def evaluate_viewpoint(self, viewpoint_pose: np.ndarray) -> float:
if not self.ellipsoids:
return 0.0
ellipsoid_weights = self.compute_ellipsoid_weights(viewpoint_pose)
projection_scores = []
for ellipsoid, weight in zip(self.ellipsoids, ellipsoid_weights):
score = self.project_ellipsoid(ellipsoid, viewpoint_pose) * weight
projection_scores.append((ellipsoid["type"], score))
frontier_score = sum(s for t, s in projection_scores if t == "frontier")
occupied_score = sum(s for t, s in projection_scores if t == "occupied")
return frontier_score - occupied_score
def compute_ellipsoid_weights(self, viewpoint_pose: np.ndarray) -> List[float]:
centers_world = np.array([e["pose"][:3, 3] for e in self.ellipsoids])
centers_homo = np.hstack((centers_world, np.ones((len(centers_world), 1))))
centers_cam = (np.linalg.inv(viewpoint_pose) @ centers_homo.T).T[:, :3]
z_coords = centers_cam[:, 2]
sorted_indices = np.argsort(z_coords)
weights = np.zeros(len(self.ellipsoids))
for rank, idx in enumerate(sorted_indices):
weights[idx] = 0.5 ** rank
return weights.tolist()
def project_ellipsoid(self, ellipsoid: Dict, viewpoint_pose: np.ndarray) -> float:
ellipsoid_pose_cam = np.linalg.inv(viewpoint_pose) @ ellipsoid["pose"]
radii = ellipsoid["radii"]
rotation = ellipsoid_pose_cam[:3, :3]
scales = np.diag(radii)
transform = rotation @ scales
major_axis = np.linalg.norm(transform[:, 0])
minor_axis = np.linalg.norm(transform[:, 1])
area = np.pi * major_axis * minor_axis
return area
def generate_candidate_views(self, num_views=100, longitude_num=5) -> List[np.ndarray]:
if self.voxel_struct.bbx_min is None:
return []
center = (self.voxel_struct.bbx_min + self.voxel_struct.bbx_max) / 2
radius = np.linalg.norm(self.voxel_struct.bbx_max - self.voxel_struct.bbx_min) / 2 + self.focal_length
candidate_views = []
latitudes = np.linspace(np.deg2rad(40), np.deg2rad(90), longitude_num)
lengths = [2 * np.pi * np.sin(lat) * radius for lat in latitudes]
total_length = sum(lengths)
points_per_lat = [int(round(num_views * l / total_length)) for l in lengths]
for lat, n in zip(latitudes, points_per_lat):
if n == 0:
continue
longitudes = np.linspace(0, 2*np.pi, n, endpoint=False)
for lon in longitudes:
x = radius * np.sin(lat) * np.cos(lon)
y = radius * np.sin(lat) * np.sin(lon)
z = radius * np.cos(lat)
position = np.array([x, y, z]) + center
z_axis = center - position
z_axis /= np.linalg.norm(z_axis)
x_axis = np.cross(z_axis, np.array([0, 0, 1]))
if np.linalg.norm(x_axis) < 1e-6:
x_axis = np.array([1, 0, 0])
x_axis /= np.linalg.norm(x_axis)
y_axis = np.cross(z_axis, x_axis)
y_axis /= np.linalg.norm(y_axis)
rotation = np.column_stack((x_axis, y_axis, z_axis))
view_pose = np.eye(4)
view_pose[:3, :3] = rotation
view_pose[:3, 3] = position
candidate_views.append(view_pose)
return candidate_views
def select_best_view(self) -> np.ndarray:
candidate_views = self.generate_candidate_views()
if not candidate_views:
return np.eye(4)
scores = [self.evaluate_viewpoint(view) for view in candidate_views]
best_idx = np.argmax(scores)
return candidate_views[best_idx]
def execute(self) -> Tuple[np.ndarray, bool]:
best_view = self.select_best_view()
has_frontier = any(e["type"] == "frontier" for e in self.ellipsoids)
done = not has_frontier
return best_view, done
import os
import json
from utils.render import RenderUtil
from utils.pose import PoseUtil
from utils.pts import PtsUtil
from utils.reconstruction import ReconstructionUtil
from beans.predict_result import PredictResult
from tqdm import tqdm
import numpy as np
import pickle
from PytorchBoot.config import ConfigManager
import PytorchBoot.namespace as namespace
import PytorchBoot.stereotype as stereotype
from PytorchBoot.factory import ComponentFactory
from PytorchBoot.dataset import BaseDataset
from PytorchBoot.runners.runner import Runner
from PytorchBoot.utils import Log
from PytorchBoot.status import status_manager
from utils.data_load import DataLoadUtil
@stereotype.runner("evaluate_pbnbv")
class EvaluatePBNBV(Runner):
def __init__(self, config_path):
super().__init__(config_path)
self.script_path = ConfigManager.get(namespace.Stereotype.RUNNER, "blender_script_path")
self.output_dir = ConfigManager.get(namespace.Stereotype.RUNNER, "output_dir")
self.voxel_size = ConfigManager.get(namespace.Stereotype.RUNNER, "voxel_size")
self.min_new_area = ConfigManager.get(namespace.Stereotype.RUNNER, "min_new_area")
CM = 0.01
self.min_new_pts_num = self.min_new_area * (CM / self.voxel_size) ** 2
self.overlap_limit = ConfigManager.get(namespace.Stereotype.RUNNER, "overlap_limit")
self.pbnbv = PBNBV(self.voxel_size)
''' Experiment '''
self.load_experiment("nbv_evaluator")
self.stat_result_path = os.path.join(self.output_dir, "stat.json")
if os.path.exists(self.stat_result_path):
with open(self.stat_result_path, "r") as f:
self.stat_result = json.load(f)
else:
self.stat_result = {}
''' Test '''
self.test_config = ConfigManager.get(namespace.Stereotype.RUNNER, namespace.Mode.TEST)
self.test_dataset_name_list = self.test_config["dataset_list"]
self.test_set_list = []
self.test_writer_list = []
seen_name = set()
for test_dataset_name in self.test_dataset_name_list:
if test_dataset_name not in seen_name:
seen_name.add(test_dataset_name)
else:
raise ValueError("Duplicate test dataset name: {}".format(test_dataset_name))
test_set: BaseDataset = ComponentFactory.create(namespace.Stereotype.DATASET, test_dataset_name)
self.test_set_list.append(test_set)
self.print_info()
def run(self):
Log.info("Loading from epoch {}.".format(self.current_epoch))
self.inference()
Log.success("Inference finished.")
def inference(self):
#self.pipeline.eval()
test_set: BaseDataset
for dataset_idx, test_set in enumerate(self.test_set_list):
status_manager.set_progress("inference", "inferencer", f"dataset", dataset_idx, len(self.test_set_list))
test_set_name = test_set.get_name()
total=int(len(test_set))
for i in tqdm(range(total), desc=f"Processing {test_set_name}", ncols=100):
try:
self.pbnbv.reset()
data = test_set.__getitem__(i)
scene_name = data["scene_name"]
inference_result_path = os.path.join(self.output_dir, test_set_name, f"{scene_name}.pkl")
if os.path.exists(inference_result_path):
Log.info(f"Inference result already exists for scene: {scene_name}")
continue
status_manager.set_progress("inference", "inferencer", f"Batch[{test_set_name}]", i+1, total)
output = self.predict_sequence(data)
self.save_inference_result(test_set_name, data["scene_name"], output)
except Exception as e:
print(e)
Log.error(f"Error, {e}")
continue
status_manager.set_progress("inference", "inferencer", f"dataset", len(self.test_set_list), len(self.test_set_list))
def get_output_data(self):
pose_matrix, done = self.pbnbv.execute()
offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
pose_matrix = pose_matrix @ offset
rot = pose_matrix[:3,:3]
pose_6d = PoseUtil.matrix_to_rotation_6d_numpy(rot)
translation = pose_matrix[:3, 3]
pose_9d = np.concatenate([pose_6d, translation], axis=0).reshape(1,9)
pose_9d = pose_9d.repeat(50, axis=0)
#import ipdb; ipdb.set_trace()
return {"pred_pose_9d": pose_9d}
def predict_sequence(self, data, cr_increase_threshold=0, overlap_area_threshold=25, scan_points_threshold=10, max_iter=50, max_retry = 10, max_success=3):
scene_name = data["scene_name"]
Log.info(f"Processing scene: {scene_name}")
status_manager.set_status("inference", "inferencer", "scene", scene_name)
''' data for rendering '''
scene_path = data["scene_path"]
O_to_L_pose = data["O_to_L_pose"]
voxel_threshold = self.voxel_size
filter_degree = 75
down_sampled_model_pts = data["gt_pts"]
first_frame_to_world_9d = data["first_scanned_n_to_world_pose_9d"][0]
first_frame_to_world = np.eye(4)
first_frame_to_world[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(first_frame_to_world_9d[:6])
first_frame_to_world[:3,3] = first_frame_to_world_9d[6:]
self.pbnbv.capture(data["first_scanned_pts"][0], first_frame_to_world)
''' data for inference '''
input_data = {}
input_data["combined_scanned_pts"] = np.array(data["first_scanned_pts"][0], dtype=np.float32)
input_data["scanned_pts"] = [np.array(data["first_scanned_pts"][0], dtype=np.float32)]
input_data["scanned_pts_mask"] = [np.zeros(input_data["combined_scanned_pts"].shape[0], dtype=np.bool_)]
input_data["scanned_n_to_world_pose_9d"] = [np.array(data["first_scanned_n_to_world_pose_9d"], dtype=np.float32)]
input_data["mode"] = namespace.Mode.TEST
input_pts_N = input_data["combined_scanned_pts"].shape[0]
root = os.path.dirname(scene_path)
display_table_info = DataLoadUtil.get_display_table_info(root, scene_name)
radius = display_table_info["radius"]
scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius))
first_frame_target_pts, first_frame_target_normals, first_frame_scan_points_indices = RenderUtil.render_pts(first_frame_to_world, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose)
scanned_view_pts = [first_frame_target_pts]
history_indices = [first_frame_scan_points_indices]
last_pred_cr, added_pts_num = self.compute_coverage_rate(scanned_view_pts, None, down_sampled_model_pts, threshold=voxel_threshold)
retry_duplication_pose = []
retry_no_pts_pose = []
retry_overlap_pose = []
retry = 0
pred_cr_seq = [last_pred_cr]
success = 0
last_pts_num = PtsUtil.voxel_downsample_point_cloud(data["first_scanned_pts"][0], voxel_threshold).shape[0]
#import time
while len(pred_cr_seq) < max_iter and retry < max_retry and success < max_success:
#import ipdb; ipdb.set_trace()
Log.green(f"iter: {len(pred_cr_seq)}, retry: {retry}/{max_retry}, success: {success}/{max_success}")
combined_scanned_pts = np.vstack(scanned_view_pts)
voxel_downsampled_combined_scanned_pts_np, inverse = self.voxel_downsample_with_mapping(combined_scanned_pts, voxel_threshold)
output = self.get_output_data()
pred_pose_9d = output["pred_pose_9d"]
pred_pose = np.eye(4)
predict_result = PredictResult(pred_pose_9d, input_pts=input_data["combined_scanned_pts"], cluster_params=dict(eps=0.25, min_samples=3))
# -----------------------
import ipdb; ipdb.set_trace()
predict_result.visualize()
# -----------------------
pred_pose_9d_candidates = predict_result.candidate_9d_poses
#import ipdb; ipdb.set_trace()
for pred_pose_9d in pred_pose_9d_candidates:
#import ipdb; ipdb.set_trace()
pred_pose_9d = np.array(pred_pose_9d, dtype=np.float32)
pred_pose[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(pred_pose_9d[:6])
pred_pose[:3,3] = pred_pose_9d[6:]
try:
new_target_pts, new_target_normals, new_scan_points_indices = RenderUtil.render_pts(pred_pose, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose)
#import ipdb; ipdb.set_trace()
if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold):
curr_overlap_area_threshold = overlap_area_threshold
else:
curr_overlap_area_threshold = overlap_area_threshold * 0.5
downsampled_new_target_pts = PtsUtil.voxel_downsample_point_cloud(new_target_pts, voxel_threshold)
#import ipdb; ipdb.set_trace()
if self.overlap_limit:
overlap, _ = ReconstructionUtil.check_overlap(downsampled_new_target_pts, voxel_downsampled_combined_scanned_pts_np, overlap_area_threshold = curr_overlap_area_threshold, voxel_size=voxel_threshold, require_new_added_pts_num = True)
if not overlap:
Log.yellow("no overlap!")
retry += 1
retry_overlap_pose.append(pred_pose.tolist())
continue
history_indices.append(new_scan_points_indices)
except Exception as e:
Log.error(f"Error in scene {scene_path}, {e}")
print("current pose: ", pred_pose)
print("curr_pred_cr: ", last_pred_cr)
retry_no_pts_pose.append(pred_pose.tolist())
retry += 1
continue
if new_target_pts.shape[0] == 0:
Log.red("no pts in new target")
retry_no_pts_pose.append(pred_pose.tolist())
retry += 1
continue
pred_cr, _ = self.compute_coverage_rate(scanned_view_pts, new_target_pts, down_sampled_model_pts, threshold=voxel_threshold)
Log.yellow(f"{pred_cr}, {last_pred_cr}, max: , {data['seq_max_coverage_rate']}")
if pred_cr >= data["seq_max_coverage_rate"] - 1e-3:
print("max coverage rate reached!: ", pred_cr)
pred_cr_seq.append(pred_cr)
scanned_view_pts.append(new_target_pts)
pred_pose_9d = pred_pose_9d.reshape(1, -1)
input_data["scanned_n_to_world_pose_9d"] = [np.concatenate([input_data["scanned_n_to_world_pose_9d"][0], pred_pose_9d], axis=0)]
combined_scanned_pts = np.vstack(scanned_view_pts)
voxel_downsampled_combined_scanned_pts_np = PtsUtil.voxel_downsample_point_cloud(combined_scanned_pts, voxel_threshold)
random_downsampled_combined_scanned_pts_np = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, input_pts_N)
self.pbnbv.capture(np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32), pred_pose)
input_data["combined_scanned_pts"] = np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32)
input_data["scanned_pts"] = [np.concatenate([input_data["scanned_pts"][0], np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32)], axis=0)]
last_pred_cr = pred_cr
pts_num = voxel_downsampled_combined_scanned_pts_np.shape[0]
Log.info(f"delta pts num:,{pts_num - last_pts_num },{pts_num}, {last_pts_num}")
if pts_num - last_pts_num < self.min_new_pts_num and pred_cr <= data["seq_max_coverage_rate"] - 1e-2:
retry += 1
retry_duplication_pose.append(pred_pose.tolist())
Log.red(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}")
elif pts_num - last_pts_num < self.min_new_pts_num and pred_cr > data["seq_max_coverage_rate"] - 1e-2:
success += 1
Log.success(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}")
last_pts_num = pts_num
input_data["scanned_n_to_world_pose_9d"] = input_data["scanned_n_to_world_pose_9d"][0].tolist()
result = {
"pred_pose_9d_seq": input_data["scanned_n_to_world_pose_9d"],
"combined_scanned_pts": input_data["combined_scanned_pts"],
"target_pts_seq": scanned_view_pts,
"coverage_rate_seq": pred_cr_seq,
"max_coverage_rate": data["seq_max_coverage_rate"],
"pred_max_coverage_rate": max(pred_cr_seq),
"scene_name": scene_name,
"retry_no_pts_pose": retry_no_pts_pose,
"retry_duplication_pose": retry_duplication_pose,
"retry_overlap_pose": retry_overlap_pose,
"best_seq_len": data["best_seq_len"],
}
self.stat_result[scene_name] = {
"coverage_rate_seq": pred_cr_seq,
"pred_max_coverage_rate": max(pred_cr_seq),
"pred_seq_len": len(pred_cr_seq),
}
print('success rate: ', max(pred_cr_seq))
return result
def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003):
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
idx_sort = np.argsort(inverse)
idx_unique = idx_sort[np.cumsum(counts)-counts]
downsampled_points = point_cloud[idx_unique]
return downsampled_points, inverse
def compute_coverage_rate(self, scanned_view_pts, new_pts, model_pts, threshold=0.005):
if new_pts is not None:
new_scanned_view_pts = scanned_view_pts + [new_pts]
else:
new_scanned_view_pts = scanned_view_pts
combined_point_cloud = np.vstack(new_scanned_view_pts)
down_sampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud,threshold)
return ReconstructionUtil.compute_coverage_rate(model_pts, down_sampled_combined_point_cloud, threshold)
def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003):
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
idx_sort = np.argsort(inverse)
idx_unique = idx_sort[np.cumsum(counts)-counts]
downsampled_points = point_cloud[idx_unique]
return downsampled_points, inverse
def save_inference_result(self, dataset_name, scene_name, output):
dataset_dir = os.path.join(self.output_dir, dataset_name)
if not os.path.exists(dataset_dir):
os.makedirs(dataset_dir)
output_path = os.path.join(dataset_dir, f"{scene_name}.pkl")
pickle.dump(output, open(output_path, "wb"))
with open(self.stat_result_path, "w") as f:
json.dump(self.stat_result, f)
def get_checkpoint_path(self, is_last=False):
return os.path.join(self.experiment_path, namespace.Direcotry.CHECKPOINT_DIR_NAME,
"Epoch_{}.pth".format(
self.current_epoch if self.current_epoch != -1 and not is_last else "last"))
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
self.current_epoch = self.experiments_config["epoch"]
#self.load_checkpoint(is_last=(self.current_epoch == -1))
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load(self, path):
# 如果仍然需要加载某些数据可以使用numpy的load方法
pass
def print_info(self):
def print_dataset(dataset: BaseDataset):
config = dataset.get_config()
name = dataset.get_name()
Log.blue(f"Dataset: {name}")
for k,v in config.items():
Log.blue(f"\t{k}: {v}")
super().print_info()
table_size = 70
Log.blue(f"{'+' + '-' * (table_size // 2)} Pipeline {'-' * (table_size // 2)}" + '+')
#Log.blue(self.pipeline)
Log.blue(f"{'+' + '-' * (table_size // 2)} Datasets {'-' * (table_size // 2)}" + '+')
for i, test_set in enumerate(self.test_set_list):
Log.blue(f"test dataset {i}: ")
print_dataset(test_set)
Log.blue(f"{'+' + '-' * (table_size // 2)}----------{'-' * (table_size // 2)}" + '+')