import os import json from utils.render import RenderUtil from utils.pose import PoseUtil from utils.pts import PtsUtil from utils.reconstruction import ReconstructionUtil from beans.predict_result import PredictResult from tqdm import tqdm import numpy as np import pickle from PytorchBoot.config import ConfigManager import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory import ComponentFactory from PytorchBoot.dataset import BaseDataset from PytorchBoot.runners.runner import Runner from PytorchBoot.utils import Log from PytorchBoot.status import status_manager from utils.data_load import DataLoadUtil @stereotype.runner("evaluate_uncertainty_guide") class EvaluateUncertaintyGuide(Runner): def __init__(self, config_path): super().__init__(config_path) self.script_path = ConfigManager.get(namespace.Stereotype.RUNNER, "blender_script_path") self.output_dir = ConfigManager.get(namespace.Stereotype.RUNNER, "output_dir") self.voxel_size = ConfigManager.get(namespace.Stereotype.RUNNER, "voxel_size") self.min_new_area = ConfigManager.get(namespace.Stereotype.RUNNER, "min_new_area") CM = 0.01 self.min_new_pts_num = self.min_new_area * (CM / self.voxel_size) ** 2 self.overlap_limit = ConfigManager.get(namespace.Stereotype.RUNNER, "overlap_limit") self.radius = 0.5 self.output_data_root = ConfigManager.get(namespace.Stereotype.RUNNER, "output_data_root") self.output_data = dict() for scene_name in os.listdir(self.output_data_root): real_scene_name = scene_name[:-len("_poses.npy")] self.output_data[real_scene_name] = np.load(os.path.join(self.output_data_root, scene_name)) #mport ipdb; ipdb.set_trace() ''' Experiment ''' self.load_experiment("nbv_evaluator") self.stat_result_path = os.path.join(self.output_dir, "stat.json") if os.path.exists(self.stat_result_path): with open(self.stat_result_path, "r") as f: self.stat_result = json.load(f) else: self.stat_result = {} ''' Test ''' self.test_config = ConfigManager.get(namespace.Stereotype.RUNNER, namespace.Mode.TEST) self.test_dataset_name_list = self.test_config["dataset_list"] self.test_set_list = [] self.test_writer_list = [] seen_name = set() for test_dataset_name in self.test_dataset_name_list: if test_dataset_name not in seen_name: seen_name.add(test_dataset_name) else: raise ValueError("Duplicate test dataset name: {}".format(test_dataset_name)) test_set: BaseDataset = ComponentFactory.create(namespace.Stereotype.DATASET, test_dataset_name) self.test_set_list.append(test_set) self.print_info() def run(self): Log.info("Loading from epoch {}.".format(self.current_epoch)) self.inference() Log.success("Inference finished.") def inference(self): #self.pipeline.eval() test_set: BaseDataset for dataset_idx, test_set in enumerate(self.test_set_list): status_manager.set_progress("inference", "inferencer", f"dataset", dataset_idx, len(self.test_set_list)) test_set_name = test_set.get_name() total=int(len(test_set)) for i in tqdm(range(total), desc=f"Processing {test_set_name}", ncols=100): try: data = test_set.__getitem__(i) scene_name = data["scene_name"] inference_result_path = os.path.join(self.output_dir, test_set_name, f"{scene_name}.pkl") if os.path.exists(inference_result_path): Log.info(f"Inference result already exists for scene: {scene_name}") continue status_manager.set_progress("inference", "inferencer", f"Batch[{test_set_name}]", i+1, total) output = self.predict_sequence(data) self.save_inference_result(test_set_name, data["scene_name"], output) except Exception as e: print(e) Log.error(f"Error, {e}") continue status_manager.set_progress("inference", "inferencer", f"dataset", len(self.test_set_list), len(self.test_set_list)) def get_output_data(self, scene_name, idx): pose_matrix = self.output_data[scene_name][idx] offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]) pose_matrix = pose_matrix @ offset rot = pose_matrix[:3,:3] pose_6d = PoseUtil.matrix_to_rotation_6d_numpy(rot) # 计算相机在球面上的位置 camera_direction = rot[:, 2] # 相机朝向球心 translation = -self.radius * camera_direction # 相机位置在球面上 pose_9d = np.concatenate([pose_6d, translation], axis=0).reshape(1,9) pose_9d = pose_9d.repeat(50, axis=0) #import ipdb; ipdb.set_trace() return {"pred_pose_9d": pose_9d} def predict_sequence(self, data, cr_increase_threshold=0, overlap_area_threshold=25, scan_points_threshold=10, max_iter=50, max_retry = 10, max_success=3): scene_name = data["scene_name"] Log.info(f"Processing scene: {scene_name}") status_manager.set_status("inference", "inferencer", "scene", scene_name) ''' data for rendering ''' scene_path = data["scene_path"] O_to_L_pose = data["O_to_L_pose"] voxel_threshold = self.voxel_size filter_degree = 75 down_sampled_model_pts = data["gt_pts"] first_frame_to_world_9d = data["first_scanned_n_to_world_pose_9d"][0] first_frame_to_world = np.eye(4) first_frame_to_world[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(first_frame_to_world_9d[:6]) first_frame_to_world[:3,3] = first_frame_to_world_9d[6:] ''' data for inference ''' input_data = {} input_data["combined_scanned_pts"] = np.array(data["first_scanned_pts"][0], dtype=np.float32) input_data["scanned_pts"] = [np.array(data["first_scanned_pts"][0], dtype=np.float32)] input_data["scanned_pts_mask"] = [np.zeros(input_data["combined_scanned_pts"].shape[0], dtype=np.bool_)] input_data["scanned_n_to_world_pose_9d"] = [np.array(data["first_scanned_n_to_world_pose_9d"], dtype=np.float32)] input_data["mode"] = namespace.Mode.TEST input_pts_N = input_data["combined_scanned_pts"].shape[0] root = os.path.dirname(scene_path) display_table_info = DataLoadUtil.get_display_table_info(root, scene_name) radius = display_table_info["radius"] scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius)) first_frame_target_pts, first_frame_target_normals, first_frame_scan_points_indices = RenderUtil.render_pts(first_frame_to_world, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose) scanned_view_pts = [first_frame_target_pts] history_indices = [first_frame_scan_points_indices] last_pred_cr, added_pts_num = self.compute_coverage_rate(scanned_view_pts, None, down_sampled_model_pts, threshold=voxel_threshold) retry_duplication_pose = [] retry_no_pts_pose = [] retry_overlap_pose = [] retry = 0 pred_cr_seq = [last_pred_cr] success = 0 last_pts_num = PtsUtil.voxel_downsample_point_cloud(data["first_scanned_pts"][0], voxel_threshold).shape[0] #import time for i in range(len(self.output_data[scene_name])): #import ipdb; ipdb.set_trace() Log.green(f"iter: {len(pred_cr_seq)}, retry: {retry}/{max_retry}, success: {success}/{max_success}") combined_scanned_pts = np.vstack(scanned_view_pts) voxel_downsampled_combined_scanned_pts_np, inverse = self.voxel_downsample_with_mapping(combined_scanned_pts, voxel_threshold) output = self.get_output_data(scene_name, i) pred_pose_9d = output["pred_pose_9d"] pred_pose = np.eye(4) predict_result = PredictResult(pred_pose_9d, input_pts=input_data["combined_scanned_pts"], cluster_params=dict(eps=0.25, min_samples=3)) # ----------------------- # import ipdb; ipdb.set_trace() # predict_result.visualize() # ----------------------- pred_pose_9d_candidates = predict_result.candidate_9d_poses #import ipdb; ipdb.set_trace() for pred_pose_9d in pred_pose_9d_candidates: #import ipdb; ipdb.set_trace() pred_pose_9d = np.array(pred_pose_9d, dtype=np.float32) pred_pose[:3,:3] = PoseUtil.rotation_6d_to_matrix_numpy(pred_pose_9d[:6]) pred_pose[:3,3] = pred_pose_9d[6:] try: new_target_pts, new_target_normals, new_scan_points_indices = RenderUtil.render_pts(pred_pose, scene_path, self.script_path, scan_points, voxel_threshold=voxel_threshold, filter_degree=filter_degree, nO_to_nL_pose=O_to_L_pose) #import ipdb; ipdb.set_trace() if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold): curr_overlap_area_threshold = overlap_area_threshold else: curr_overlap_area_threshold = overlap_area_threshold * 0.5 downsampled_new_target_pts = PtsUtil.voxel_downsample_point_cloud(new_target_pts, voxel_threshold) #import ipdb; ipdb.set_trace() if self.overlap_limit: overlap, _ = ReconstructionUtil.check_overlap(downsampled_new_target_pts, voxel_downsampled_combined_scanned_pts_np, overlap_area_threshold = curr_overlap_area_threshold, voxel_size=voxel_threshold, require_new_added_pts_num = True) if not overlap: Log.yellow("no overlap!") retry += 1 retry_overlap_pose.append(pred_pose.tolist()) continue history_indices.append(new_scan_points_indices) except Exception as e: Log.error(f"Error in scene {scene_path}, {e}") print("current pose: ", pred_pose) print("curr_pred_cr: ", last_pred_cr) retry_no_pts_pose.append(pred_pose.tolist()) retry += 1 continue if new_target_pts.shape[0] == 0: Log.red("no pts in new target") retry_no_pts_pose.append(pred_pose.tolist()) retry += 1 continue pred_cr, _ = self.compute_coverage_rate(scanned_view_pts, new_target_pts, down_sampled_model_pts, threshold=voxel_threshold) Log.yellow(f"{pred_cr}, {last_pred_cr}, max: , {data['seq_max_coverage_rate']}") if pred_cr >= data["seq_max_coverage_rate"] - 1e-3: print("max coverage rate reached!: ", pred_cr) pred_cr_seq.append(pred_cr) scanned_view_pts.append(new_target_pts) pred_pose_9d = pred_pose_9d.reshape(1, -1) input_data["scanned_n_to_world_pose_9d"] = [np.concatenate([input_data["scanned_n_to_world_pose_9d"][0], pred_pose_9d], axis=0)] combined_scanned_pts = np.vstack(scanned_view_pts) voxel_downsampled_combined_scanned_pts_np = PtsUtil.voxel_downsample_point_cloud(combined_scanned_pts, voxel_threshold) random_downsampled_combined_scanned_pts_np = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, input_pts_N) input_data["combined_scanned_pts"] = np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32) input_data["scanned_pts"] = [np.concatenate([input_data["scanned_pts"][0], np.array(random_downsampled_combined_scanned_pts_np, dtype=np.float32)], axis=0)] last_pred_cr = pred_cr pts_num = voxel_downsampled_combined_scanned_pts_np.shape[0] Log.info(f"delta pts num:,{pts_num - last_pts_num },{pts_num}, {last_pts_num}") if pts_num - last_pts_num < self.min_new_pts_num and pred_cr <= data["seq_max_coverage_rate"] - 1e-2: retry += 1 retry_duplication_pose.append(pred_pose.tolist()) Log.red(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}") elif pts_num - last_pts_num < self.min_new_pts_num and pred_cr > data["seq_max_coverage_rate"] - 1e-2: success += 1 Log.success(f"delta pts num < {self.min_new_pts_num}:, {pts_num}, {last_pts_num}") last_pts_num = pts_num input_data["scanned_n_to_world_pose_9d"] = input_data["scanned_n_to_world_pose_9d"][0].tolist() result = { "pred_pose_9d_seq": input_data["scanned_n_to_world_pose_9d"], "combined_scanned_pts": input_data["combined_scanned_pts"], "target_pts_seq": scanned_view_pts, "coverage_rate_seq": pred_cr_seq, "max_coverage_rate": data["seq_max_coverage_rate"], "pred_max_coverage_rate": max(pred_cr_seq), "scene_name": scene_name, "retry_no_pts_pose": retry_no_pts_pose, "retry_duplication_pose": retry_duplication_pose, "retry_overlap_pose": retry_overlap_pose, "best_seq_len": data["best_seq_len"], } self.stat_result[scene_name] = { "coverage_rate_seq": pred_cr_seq, "pred_max_coverage_rate": max(pred_cr_seq), "pred_seq_len": len(pred_cr_seq), } print('success rate: ', max(pred_cr_seq)) return result def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003): voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32) unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True) idx_sort = np.argsort(inverse) idx_unique = idx_sort[np.cumsum(counts)-counts] downsampled_points = point_cloud[idx_unique] return downsampled_points, inverse def compute_coverage_rate(self, scanned_view_pts, new_pts, model_pts, threshold=0.005): if new_pts is not None: new_scanned_view_pts = scanned_view_pts + [new_pts] else: new_scanned_view_pts = scanned_view_pts combined_point_cloud = np.vstack(new_scanned_view_pts) down_sampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud,threshold) return ReconstructionUtil.compute_coverage_rate(model_pts, down_sampled_combined_point_cloud, threshold) def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003): voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32) unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True) idx_sort = np.argsort(inverse) idx_unique = idx_sort[np.cumsum(counts)-counts] downsampled_points = point_cloud[idx_unique] return downsampled_points, inverse def save_inference_result(self, dataset_name, scene_name, output): dataset_dir = os.path.join(self.output_dir, dataset_name) if not os.path.exists(dataset_dir): os.makedirs(dataset_dir) output_path = os.path.join(dataset_dir, f"{scene_name}.pkl") pickle.dump(output, open(output_path, "wb")) with open(self.stat_result_path, "w") as f: json.dump(self.stat_result, f) def get_checkpoint_path(self, is_last=False): return os.path.join(self.experiment_path, namespace.Direcotry.CHECKPOINT_DIR_NAME, "Epoch_{}.pth".format( self.current_epoch if self.current_epoch != -1 and not is_last else "last")) def load_experiment(self, backup_name=None): super().load_experiment(backup_name) self.current_epoch = self.experiments_config["epoch"] #self.load_checkpoint(is_last=(self.current_epoch == -1)) def create_experiment(self, backup_name=None): super().create_experiment(backup_name) def load(self, path): # 如果仍然需要加载某些数据,可以使用numpy的load方法 pass def print_info(self): def print_dataset(dataset: BaseDataset): config = dataset.get_config() name = dataset.get_name() Log.blue(f"Dataset: {name}") for k,v in config.items(): Log.blue(f"\t{k}: {v}") super().print_info() table_size = 70 Log.blue(f"{'+' + '-' * (table_size // 2)} Pipeline {'-' * (table_size // 2)}" + '+') #Log.blue(self.pipeline) Log.blue(f"{'+' + '-' * (table_size // 2)} Datasets {'-' * (table_size // 2)}" + '+') for i, test_set in enumerate(self.test_set_list): Log.blue(f"test dataset {i}: ") print_dataset(test_set) Log.blue(f"{'+' + '-' * (table_size // 2)}----------{'-' * (table_size // 2)}" + '+')