import numpy as np from sklearn.mixture import GaussianMixture from typing import List, Tuple, Dict from enum import Enum class VoxelType(Enum): NONE = 0 OCCUPIED = 1 EMPTY = 2 UNKNOWN = 3 FRONTIER = 4 class VoxelStruct: def __init__(self, voxel_resolution=0.01, ray_trace_step=0.01, surrounding_radius=1, num_parallels=10, viewpoints_per_parallel=10, camera_working_distance=0.5): self.voxel_resolution = voxel_resolution self.ray_trace_step = ray_trace_step self.surrounding_radius = surrounding_radius self.num_parallels = num_parallels self.viewpoints_per_parallel = viewpoints_per_parallel self.camera_working_distance = camera_working_distance self.occupied_voxels = [] self.empty_voxels = [] self.unknown_voxels = [] self.frontier_voxels = [] self.bbx_min = None self.bbx_max = None self.voxel_types: Dict[Tuple[float, float, float], VoxelType] = {} def update_voxel_map(self, points: np.ndarray, camera_pose: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]: points = self.transform_points(points, camera_pose) new_occupied = self.voxelize_points(points) self.occupied_voxels.extend(new_occupied) self.update_bounding_box() self.ray_tracing(camera_pose[:3, 3], camera_pose[:3, :3]) self.update_frontier_voxels() return self.frontier_voxels, self.occupied_voxels def ray_tracing(self, camera_position: np.ndarray, camera_rotation: np.ndarray): if self.bbx_min is None or self.bbx_max is None: return directions = self.generate_ray_directions() for direction in directions: direction_cam = camera_rotation @ direction current_pos = camera_position.copy() cnt = 0 while not self.is_in_bounding_box(current_pos): current_pos -= direction_cam * self.ray_trace_step*2 cnt += 1 if cnt > 200: break occupied_flag = False maybe_unknown_voxels = [] while self.is_in_bounding_box(current_pos): voxel = self.get_voxel_coordinate(current_pos) voxel_key = tuple(voxel) if self.is_occupied(voxel): current_pos -= direction_cam * self.ray_trace_step occupied_flag = True continue if not occupied_flag: if voxel_key not in self.voxel_types or self.voxel_types[voxel_key] == VoxelType.NONE or self.voxel_types[voxel_key] == VoxelType.UNKNOWN: maybe_unknown_voxels.append(voxel) else: if voxel_key not in self.voxel_types or self.voxel_types[voxel_key] == VoxelType.NONE: self.voxel_types[voxel_key] = VoxelType.UNKNOWN self.unknown_voxels.append(voxel) current_pos -= direction_cam * self.ray_trace_step if not occupied_flag: for voxel in maybe_unknown_voxels: self.voxel_types[tuple(voxel)] = VoxelType.UNKNOWN self.unknown_voxels.append(voxel) else: for voxel in maybe_unknown_voxels: voxel_key = tuple(voxel) if voxel_key in self.voxel_types and self.voxel_types[voxel_key] == VoxelType.UNKNOWN: self.unknown_voxels = [v for v in self.unknown_voxels if not np.array_equal(v, voxel)] self.voxel_types[voxel_key] = VoxelType.EMPTY self.empty_voxels.append(voxel) def generate_ray_directions(self): directions = [] if self.bbx_min is not None and self.bbx_max is not None: bbx_diagonal = np.linalg.norm(self.bbx_max - self.bbx_min) hemisphere_radius = self.camera_working_distance + bbx_diagonal / 2 else: hemisphere_radius = self.camera_working_distance # 使用更密集的采样 theta_step = np.pi / (6 * self.num_parallels) # 减小theta的步长 phi_step = np.pi / (6 * self.viewpoints_per_parallel) # 减小phi的步长 # 从顶部到底部采样 for theta in np.arange(0, np.pi/6 + theta_step, theta_step): # 在每个纬度上采样 for phi in np.arange(0, 2*np.pi, phi_step): x = hemisphere_radius * np.sin(theta) * np.cos(phi) y = hemisphere_radius * np.sin(theta) * np.sin(phi) z = hemisphere_radius * np.cos(theta) direction = np.array([-x, -y, -z]) direction = direction / np.linalg.norm(direction) directions.append(direction) return directions def update_frontier_voxels(self): self.frontier_voxels = [] remaining_unknown = [] for voxel in self.unknown_voxels: neighbors = self.find_neighbors(voxel) has_empty = any(self.voxel_types.get(tuple(n), VoxelType.NONE) == VoxelType.EMPTY for n in neighbors) has_occupied = any(self.voxel_types.get(tuple(n), VoxelType.NONE) == VoxelType.OCCUPIED for n in neighbors) if has_empty and has_occupied: self.voxel_types[tuple(voxel)] = VoxelType.FRONTIER self.frontier_voxels.append(voxel) else: remaining_unknown.append(voxel) self.unknown_voxels = remaining_unknown def is_in_bounding_box(self, point: np.ndarray) -> bool: if self.bbx_min is None or self.bbx_max is None: return False return np.all(point >= self.bbx_min) and np.all(point <= self.bbx_max) def get_voxel_coordinate(self, point: np.ndarray) -> np.ndarray: return (point / self.voxel_resolution).astype(int) * self.voxel_resolution def voxelize_points(self, points: np.ndarray) -> List[np.ndarray]: voxel_coords = (points / self.voxel_resolution).astype(int) unique_voxels = np.unique(voxel_coords, axis=0) voxels = [voxel * self.voxel_resolution for voxel in unique_voxels] for voxel in voxels: self.voxel_types[tuple(voxel)] = VoxelType.OCCUPIED return voxels def is_occupied(self, voxel: np.ndarray) -> bool: return self.voxel_types.get(tuple(voxel), VoxelType.NONE) == VoxelType.OCCUPIED def find_neighbors(self, voxel: np.ndarray) -> List[np.ndarray]: neighbors = [] for dx in [-1, 0, 1]: for dy in [-1, 0, 1]: for dz in [-1, 0, 1]: if dx == 0 and dy == 0 and dz == 0: continue neighbor = voxel + np.array([dx, dy, dz]) * self.voxel_resolution neighbors.append(neighbor) return neighbors def update_bounding_box(self): if not self.occupied_voxels: return occupied_array = np.array(self.occupied_voxels) self.bbx_min = occupied_array.min(axis=0) - 2 * self.voxel_resolution self.bbx_max = occupied_array.max(axis=0) + 2 * self.voxel_resolution def transform_points(self, points: np.ndarray, transform: np.ndarray) -> np.ndarray: ones = np.ones((points.shape[0], 1)) points_homo = np.hstack((points, ones)) transformed = (transform @ points_homo.T).T return transformed[:, :3] def create_voxel_geometry(self,voxels, color, voxel_size): import open3d as o3d points = np.array(voxels) if len(points) == 0: return None pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points) pcd.colors = o3d.utility.Vector3dVector(np.tile(color, (len(points), 1))) return pcd def create_ray_geometry(self,camera_pos, directions, camera_rot, length=1.0): import open3d as o3d lines = [] colors = [] for direction in directions: # 将方向向量转换到相机坐标系 direction_cam = camera_rot @ direction end_point = camera_pos - direction_cam * length lines.append([camera_pos, end_point]) colors.append([0.5, 0.5, 0.5]) # 灰色光线 line_set = o3d.geometry.LineSet() line_set.points = o3d.utility.Vector3dVector(np.array(lines).reshape(-1, 3)) line_set.lines = o3d.utility.Vector2iVector(np.array([[i*2, i*2+1] for i in range(len(lines))])) line_set.colors = o3d.utility.Vector3dVector(colors) return line_set def visualize_voxel_struct(self, camera_pose: np.ndarray = None): import open3d as o3d vis = o3d.visualization.Visualizer() vis.create_window() coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0]) vis.add_geometry(coordinate_frame) # 显示已占据的体素(蓝色) occupied_voxels = self.create_voxel_geometry( self.occupied_voxels, [0, 0, 1], self.voxel_resolution ) if occupied_voxels: vis.add_geometry(occupied_voxels) # 显示空体素(绿色) empty_voxels = self.create_voxel_geometry( self.empty_voxels, [0, 1, 0], self.voxel_resolution ) if empty_voxels: vis.add_geometry(empty_voxels) # 显示未知体素(灰色) unknown_voxels = self.create_voxel_geometry( self.unknown_voxels, [0.5, 0.5, 0.5], self.voxel_resolution ) if unknown_voxels: vis.add_geometry(unknown_voxels) # 显示frontier体素(红色) frontier_voxels = self.create_voxel_geometry( self.frontier_voxels, [1, 0, 0], self.voxel_resolution ) if frontier_voxels: vis.add_geometry(frontier_voxels) # 显示光线 if camera_pose is not None: directions = self.generate_ray_directions() rays = self.create_ray_geometry( camera_pose[:3, 3], directions, camera_pose[:3, :3], length=0.5 # 光线长度 ) vis.add_geometry(rays) opt = vis.get_render_option() opt.background_color = np.asarray([0.8, 0.8, 0.8]) opt.point_size = 5.0 vis.run() vis.destroy_window() class PBNBV: def __init__(self, voxel_resolution=0.01, camera_intrinsic=None): self.voxel_resolution = voxel_resolution self.voxel_struct = VoxelStruct(voxel_resolution) self.camera_intrinsic = camera_intrinsic or np.array([ [902.14, 0, 320], [0, 902.14, 200], [0, 0, 1] ]) self.focal_length = (self.camera_intrinsic[0,0] + self.camera_intrinsic[1,1]) / 2 / 1000 self.ellipsoids = [] def capture(self, point_cloud: np.ndarray, camera_pose: np.ndarray): frontier_voxels, occupied_voxels = self.voxel_struct.update_voxel_map(point_cloud, camera_pose) # self.voxel_struct.visualize_voxel_struct(camera_pose) self.fit_ellipsoids(frontier_voxels, occupied_voxels) def reset(self): self.ellipsoids = [] self.voxel_struct = VoxelStruct(self.voxel_resolution) def fit_ellipsoids(self, frontier_voxels: List[np.ndarray], occupied_voxels: List[np.ndarray], max_ellipsoids=10): self.ellipsoids = [] if not frontier_voxels and not occupied_voxels: return if frontier_voxels: frontier_gmm = self.fit_gmm(np.array(frontier_voxels), max_ellipsoids) self.ellipsoids.extend(self.gmm_to_ellipsoids(frontier_gmm, "frontier")) if occupied_voxels: occupied_gmm = self.fit_gmm(np.array(occupied_voxels), max_ellipsoids) self.ellipsoids.extend(self.gmm_to_ellipsoids(occupied_gmm, "occupied")) def fit_gmm(self, data: np.ndarray, max_components: int) -> GaussianMixture: best_gmm = None best_bic = np.inf for n in range(1, min(max_components, len(data)) + 1): gmm = GaussianMixture(n_components=n, covariance_type='full') gmm.fit(data) bic = gmm.bic(data) if bic < best_bic: best_bic = bic best_gmm = gmm return best_gmm def gmm_to_ellipsoids(self, gmm: GaussianMixture, ellipsoid_type: str) -> List[Dict]: ellipsoids = [] for i in range(gmm.n_components): mean = gmm.means_[i] cov = gmm.covariances_[i] eigvals, eigvecs = np.linalg.eigh(cov) radii = np.sqrt(eigvals) * 3 rotation = eigvecs pose = np.eye(4) pose[:3, :3] = rotation pose[:3, 3] = mean ellipsoids.append({ "type": ellipsoid_type, "pose": pose, "radii": radii }) return ellipsoids def evaluate_viewpoint(self, viewpoint_pose: np.ndarray) -> float: if not self.ellipsoids: return 0.0 ellipsoid_weights = self.compute_ellipsoid_weights(viewpoint_pose) projection_scores = [] for ellipsoid, weight in zip(self.ellipsoids, ellipsoid_weights): score = self.project_ellipsoid(ellipsoid, viewpoint_pose) * weight projection_scores.append((ellipsoid["type"], score)) frontier_score = sum(s for t, s in projection_scores if t == "frontier") occupied_score = sum(s for t, s in projection_scores if t == "occupied") return frontier_score - occupied_score def compute_ellipsoid_weights(self, viewpoint_pose: np.ndarray) -> List[float]: centers_world = np.array([e["pose"][:3, 3] for e in self.ellipsoids]) centers_homo = np.hstack((centers_world, np.ones((len(centers_world), 1)))) centers_cam = (np.linalg.inv(viewpoint_pose) @ centers_homo.T).T[:, :3] z_coords = centers_cam[:, 2] sorted_indices = np.argsort(z_coords) weights = np.zeros(len(self.ellipsoids)) for rank, idx in enumerate(sorted_indices): weights[idx] = 0.5 ** rank return weights.tolist() def project_ellipsoid(self, ellipsoid: Dict, viewpoint_pose: np.ndarray) -> float: ellipsoid_pose_cam = np.linalg.inv(viewpoint_pose) @ ellipsoid["pose"] radii = ellipsoid["radii"] rotation = ellipsoid_pose_cam[:3, :3] scales = np.diag(radii) transform = rotation @ scales major_axis = np.linalg.norm(transform[:, 0]) minor_axis = np.linalg.norm(transform[:, 1]) area = np.pi * major_axis * minor_axis return area def generate_candidate_views(self, num_views=100, longitude_num=5) -> List[np.ndarray]: if self.voxel_struct.bbx_min is None: return [] center = (self.voxel_struct.bbx_min + self.voxel_struct.bbx_max) / 2 radius = np.linalg.norm(self.voxel_struct.bbx_max - self.voxel_struct.bbx_min) / 2 + self.focal_length candidate_views = [] latitudes = np.linspace(np.deg2rad(40), np.deg2rad(90), longitude_num) lengths = [2 * np.pi * np.sin(lat) * radius for lat in latitudes] total_length = sum(lengths) points_per_lat = [int(round(num_views * l / total_length)) for l in lengths] for lat, n in zip(latitudes, points_per_lat): if n == 0: continue longitudes = np.linspace(0, 2*np.pi, n, endpoint=False) for lon in longitudes: x = radius * np.sin(lat) * np.cos(lon) y = radius * np.sin(lat) * np.sin(lon) z = radius * np.cos(lat) position = np.array([x, y, z]) + center z_axis = center - position z_axis /= np.linalg.norm(z_axis) x_axis = np.cross(z_axis, np.array([0, 0, 1])) if np.linalg.norm(x_axis) < 1e-6: x_axis = np.array([1, 0, 0]) x_axis /= np.linalg.norm(x_axis) y_axis = np.cross(z_axis, x_axis) y_axis /= np.linalg.norm(y_axis) rotation = np.column_stack((x_axis, y_axis, z_axis)) view_pose = np.eye(4) view_pose[:3, :3] = rotation view_pose[:3, 3] = position candidate_views.append(view_pose) return candidate_views def select_best_view(self) -> np.ndarray: candidate_views = self.generate_candidate_views() if not candidate_views: return np.eye(4) scores = [self.evaluate_viewpoint(view) for view in candidate_views] best_idx = np.argmax(scores) return candidate_views[best_idx] def execute(self) -> Tuple[np.ndarray, bool]: best_view = self.select_best_view() has_frontier = any(e["type"] == "frontier" for e in self.ellipsoids) done = not has_frontier return best_view, done import os import json from utils.render import RenderUtil from utils.pose import PoseUtil from utils.pts import PtsUtil from utils.reconstruction import ReconstructionUtil from beans.predict_result import PredictResult from tqdm import tqdm import numpy as np import pickle from PytorchBoot.config import ConfigManager import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory import ComponentFactory from PytorchBoot.dataset import BaseDataset from PytorchBoot.runners.runner import Runner from PytorchBoot.utils import Log from PytorchBoot.status import status_manager from utils.data_load import DataLoadUtil @stereotype.runner("evaluate_pbnbv") class EvaluatePBNBV(Runner): def __init__(self, config_path): super().__init__(config_path) self.script_path = ConfigManager.get(namespace.Stereotype.RUNNER, "blender_script_path") self.output_dir = ConfigManager.get(namespace.Stereotype.RUNNER, "output_dir") self.voxel_size = ConfigManager.get(namespace.Stereotype.RUNNER, "voxel_size") self.min_new_area = ConfigManager.get(namespace.Stereotype.RUNNER, "min_new_area") CM = 0.01 self.min_new_pts_num = self.min_new_area * (CM / self.voxel_size) ** 2 self.overlap_limit = ConfigManager.get(namespace.Stereotype.RUNNER, "overlap_limit") self.pbnbv = PBNBV(self.voxel_size) ''' Experiment ''' self.load_experiment("nbv_evaluator") self.stat_result_path = os.path.join(self.output_dir, "stat.json") if os.path.exists(self.stat_result_path): with open(self.stat_result_path, "r") as f: self.stat_result = json.load(f) else: self.stat_result = {} ''' Test ''' self.test_config = ConfigManager.get(namespace.Stereotype.RUNNER, namespace.Mode.TEST) self.test_dataset_name_list = self.test_config["dataset_list"] self.test_set_list = [] self.test_writer_list = [] seen_name = set() for test_dataset_name in self.test_dataset_name_list: if test_dataset_name not in seen_name: seen_name.add(test_dataset_name) else: raise ValueError("Duplicate test dataset name: {}".format(test_dataset_name)) test_set: BaseDataset = ComponentFactory.create(namespace.Stereotype.DATASET, test_dataset_name) self.test_set_list.append(test_set) self.print_info() def run(self): Log.info("Loading from epoch {}.".format(self.current_epoch)) self.inference() Log.success("Inference finished.") def inference(self): #self.pipeline.eval() test_set: BaseDataset for dataset_idx, test_set in enumerate(self.test_set_list): status_manager.set_progress("inference", "inferencer", f"dataset", dataset_idx, len(self.test_set_list)) test_set_name = test_set.get_name() total=int(len(test_set)) for i in tqdm(range(total), desc=f"Processing {test_set_name}", ncols=100): try: self.pbnbv.reset() data = test_set.__getitem__(i) scene_name = data["scene_name"] inference_result_path = os.path.join(self.output_dir, test_set_name, f"{scene_name}.pkl") if os.path.exists(inference_result_path): Log.info(f"Inference result already exists for scene: {scene_name}") continue status_manager.set_progress("inference", "inferencer", f"Batch[{test_set_name}]", i+1, total) output = self.predict_sequence(data) self.save_inference_result(test_set_name, data["scene_name"], output) except Exception as e: print(e) Log.error(f"Error, {e}") continue status_manager.set_progress("inference", "inferencer", f"dataset", len(self.test_set_list), len(self.test_set_list)) def get_output_data(self): pose_matrix, done = self.pbnbv.execute() offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]) pose_matrix = pose_matrix @ offset rot = pose_matrix[:3,:3] pose_6d = PoseUtil.matrix_to_rotation_6d_numpy(rot) translation = pose_matrix[:3, 3] pose_9d = np.concatenate([pose_6d, translation], axis=0).reshape(1,9) pose_9d = pose_9d.repeat(50, axis=0) #import ipdb; ipdb.set_trace() return {"pred_pose_9d": pose_9d} def predict_sequence(self, data, cr_increase_threshold=0, overlap_area_threshold=25, scan_points_threshold=10, max_iter=50, max_retry = 10, max_success=3): scene_name = data["scene_name"] Log.info(f"Processing scene: {scene_name}") status_manager.set_status("inference", "inferencer", "scene", scene_name) ''' data for rendering ''' scene_path = data["scene_path"] O_to_L_pose = data["O_to_L_pose"] voxel_threshold = self.voxel_size filter_degree = 75 down_sampled_model_pts = data["gt_pts"] first_frame_to_world_9d = data["first_scanned_n_to_world_pose_9d"][0] first_frame_to_world = np.eye(4) first_frame_to_world[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(first_frame_to_world_9d[:6]) first_frame_to_world[:3,3] = first_frame_to_world_9d[6:] self.pbnbv.capture(data["first_scanned_pts"][0], first_frame_to_world) ''' data for inference ''' input_data = {} input_data["combined_scanned_pts"] = np.array(data["first_scanned_pts"][0], dtype=np.float32) input_data["scanned_pts"] = [np.array(data["first_scanned_pts"][0], dtype=np.float32)] input_data["scanned_pts_mask"] = [np.zeros(input_data["combined_scanned_pts"].shape[0], dtype=np.bool_)] input_data["scanned_n_to_world_pose_9d"] = [np.array(data["first_scanned_n_to_world_pose_9d"], dtype=np.float32)] input_data["mode"] = namespace.Mode.TEST input_pts_N = input_data["combined_scanned_pts"].shape[0] root = os.path.dirname(scene_path) display_table_info = DataLoadUtil.get_display_table_info(root, scene_name) radius = display_table_info["radius"] scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius)) first_frame_target_pts, first_frame_target_normals, first_frame_scan_points_indices = RenderUtil.render_pts(first_frame_to_world, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose) scanned_view_pts = [first_frame_target_pts] history_indices = [first_frame_scan_points_indices] last_pred_cr, added_pts_num = self.compute_coverage_rate(scanned_view_pts, None, down_sampled_model_pts, threshold=voxel_threshold) retry_duplication_pose = [] retry_no_pts_pose = [] retry_overlap_pose = [] retry = 0 pred_cr_seq = [last_pred_cr] success = 0 last_pts_num = PtsUtil.voxel_downsample_point_cloud(data["first_scanned_pts"][0], voxel_threshold).shape[0] #import time while len(pred_cr_seq) < max_iter and retry < max_retry and success < max_success: #import ipdb; ipdb.set_trace() Log.green(f"iter: {len(pred_cr_seq)}, retry: {retry}/{max_retry}, success: {success}/{max_success}") combined_scanned_pts = np.vstack(scanned_view_pts) voxel_downsampled_combined_scanned_pts_np, inverse = self.voxel_downsample_with_mapping(combined_scanned_pts, voxel_threshold) output = self.get_output_data() pred_pose_9d = output["pred_pose_9d"] pred_pose = np.eye(4) predict_result = PredictResult(pred_pose_9d, input_pts=input_data["combined_scanned_pts"], cluster_params=dict(eps=0.25, min_samples=3)) # ----------------------- import ipdb; ipdb.set_trace() predict_result.visualize() # ----------------------- pred_pose_9d_candidates = predict_result.candidate_9d_poses #import ipdb; ipdb.set_trace() for pred_pose_9d in pred_pose_9d_candidates: #import ipdb; ipdb.set_trace() pred_pose_9d = np.array(pred_pose_9d, dtype=np.float32) pred_pose[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(pred_pose_9d[:6]) pred_pose[:3,3] = pred_pose_9d[6:] try: new_target_pts, new_target_normals, new_scan_points_indices = RenderUtil.render_pts(pred_pose, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose) #import ipdb; ipdb.set_trace() if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold): curr_overlap_area_threshold = overlap_area_threshold else: curr_overlap_area_threshold = overlap_area_threshold * 0.5 downsampled_new_target_pts = PtsUtil.voxel_downsample_point_cloud(new_target_pts, voxel_threshold) #import ipdb; ipdb.set_trace() if self.overlap_limit: overlap, _ = ReconstructionUtil.check_overlap(downsampled_new_target_pts, voxel_downsampled_combined_scanned_pts_np, overlap_area_threshold = curr_overlap_area_threshold, voxel_size=voxel_threshold, require_new_added_pts_num = True) if not overlap: Log.yellow("no overlap!") retry += 1 retry_overlap_pose.append(pred_pose.tolist()) continue history_indices.append(new_scan_points_indices) except Exception as e: Log.error(f"Error in scene {scene_path}, {e}") print("current pose: ", pred_pose) print("curr_pred_cr: ", last_pred_cr) retry_no_pts_pose.append(pred_pose.tolist()) retry += 1 continue if new_target_pts.shape[0] == 0: Log.red("no pts in new target") retry_no_pts_pose.append(pred_pose.tolist()) retry += 1 continue pred_cr, _ = self.compute_coverage_rate(scanned_view_pts, new_target_pts, down_sampled_model_pts, threshold=voxel_threshold) Log.yellow(f"{pred_cr}, {last_pred_cr}, max: , {data['seq_max_coverage_rate']}") if pred_cr >= data["seq_max_coverage_rate"] - 1e-3: print("max coverage rate reached!: ", pred_cr) pred_cr_seq.append(pred_cr) scanned_view_pts.append(new_target_pts) pred_pose_9d = pred_pose_9d.reshape(1, -1) input_data["scanned_n_to_world_pose_9d"] = [np.concatenate([input_data["scanned_n_to_world_pose_9d"][0], pred_pose_9d], axis=0)] combined_scanned_pts = np.vstack(scanned_view_pts) voxel_downsampled_combined_scanned_pts_np = PtsUtil.voxel_downsample_point_cloud(combined_scanned_pts, voxel_threshold) random_downsampled_combined_scanned_pts_np = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, input_pts_N) self.pbnbv.capture(np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32), pred_pose) input_data["combined_scanned_pts"] = np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32) input_data["scanned_pts"] = [np.concatenate([input_data["scanned_pts"][0], np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32)], axis=0)] last_pred_cr = pred_cr pts_num = voxel_downsampled_combined_scanned_pts_np.shape[0] Log.info(f"delta pts num:,{pts_num - last_pts_num },{pts_num}, {last_pts_num}") if pts_num - last_pts_num < self.min_new_pts_num and pred_cr <= data["seq_max_coverage_rate"] - 1e-2: retry += 1 retry_duplication_pose.append(pred_pose.tolist()) Log.red(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}") elif pts_num - last_pts_num < self.min_new_pts_num and pred_cr > data["seq_max_coverage_rate"] - 1e-2: success += 1 Log.success(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}") last_pts_num = pts_num input_data["scanned_n_to_world_pose_9d"] = input_data["scanned_n_to_world_pose_9d"][0].tolist() result = { "pred_pose_9d_seq": input_data["scanned_n_to_world_pose_9d"], "combined_scanned_pts": input_data["combined_scanned_pts"], "target_pts_seq": scanned_view_pts, "coverage_rate_seq": pred_cr_seq, "max_coverage_rate": data["seq_max_coverage_rate"], "pred_max_coverage_rate": max(pred_cr_seq), "scene_name": scene_name, "retry_no_pts_pose": retry_no_pts_pose, "retry_duplication_pose": retry_duplication_pose, "retry_overlap_pose": retry_overlap_pose, "best_seq_len": data["best_seq_len"], } self.stat_result[scene_name] = { "coverage_rate_seq": pred_cr_seq, "pred_max_coverage_rate": max(pred_cr_seq), "pred_seq_len": len(pred_cr_seq), } print('success rate: ', max(pred_cr_seq)) return result def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003): voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32) unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True) idx_sort = np.argsort(inverse) idx_unique = idx_sort[np.cumsum(counts)-counts] downsampled_points = point_cloud[idx_unique] return downsampled_points, inverse def compute_coverage_rate(self, scanned_view_pts, new_pts, model_pts, threshold=0.005): if new_pts is not None: new_scanned_view_pts = scanned_view_pts + [new_pts] else: new_scanned_view_pts = scanned_view_pts combined_point_cloud = np.vstack(new_scanned_view_pts) down_sampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud,threshold) return ReconstructionUtil.compute_coverage_rate(model_pts, down_sampled_combined_point_cloud, threshold) def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003): voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32) unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True) idx_sort = np.argsort(inverse) idx_unique = idx_sort[np.cumsum(counts)-counts] downsampled_points = point_cloud[idx_unique] return downsampled_points, inverse def save_inference_result(self, dataset_name, scene_name, output): dataset_dir = os.path.join(self.output_dir, dataset_name) if not os.path.exists(dataset_dir): os.makedirs(dataset_dir) output_path = os.path.join(dataset_dir, f"{scene_name}.pkl") pickle.dump(output, open(output_path, "wb")) with open(self.stat_result_path, "w") as f: json.dump(self.stat_result, f) def get_checkpoint_path(self, is_last=False): return os.path.join(self.experiment_path, namespace.Direcotry.CHECKPOINT_DIR_NAME, "Epoch_{}.pth".format( self.current_epoch if self.current_epoch != -1 and not is_last else "last")) def load_experiment(self, backup_name=None): super().load_experiment(backup_name) self.current_epoch = self.experiments_config["epoch"] #self.load_checkpoint(is_last=(self.current_epoch == -1)) def create_experiment(self, backup_name=None): super().create_experiment(backup_name) def load(self, path): # 如果仍然需要加载某些数据,可以使用numpy的load方法 pass def print_info(self): def print_dataset(dataset: BaseDataset): config = dataset.get_config() name = dataset.get_name() Log.blue(f"Dataset: {name}") for k,v in config.items(): Log.blue(f"\t{k}: {v}") super().print_info() table_size = 70 Log.blue(f"{'+' + '-' * (table_size // 2)} Pipeline {'-' * (table_size // 2)}" + '+') #Log.blue(self.pipeline) Log.blue(f"{'+' + '-' * (table_size // 2)} Datasets {'-' * (table_size // 2)}" + '+') for i, test_set in enumerate(self.test_set_list): Log.blue(f"test dataset {i}: ") print_dataset(test_set) Log.blue(f"{'+' + '-' * (table_size // 2)}----------{'-' * (table_size // 2)}" + '+')