import numpy as np from PytorchBoot.dataset import BaseDataset import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.utils.log_util import Log import torch import os import sys sys.path.append(r"/home/data/hofee/project/nbv_rec/nbv_reconstruction") from utils.data_load import DataLoadUtil from utils.pose import PoseUtil from utils.pts import PtsUtil @stereotype.dataset("old_seq_nbv_reconstruction_dataset") class SeqNBVReconstructionDataset(BaseDataset): def __init__(self, config): super(SeqNBVReconstructionDataset, self).__init__(config) self.type = config["type"] if self.type != namespace.Mode.TEST: Log.error("Dataset Only support test mode", terminate=True) self.config = config self.root_dir = config["root_dir"] self.split_file_path = config["split_file"] self.scene_name_list = self.load_scene_name_list() self.datalist = self.get_datalist() self.pts_num = config["pts_num"] self.model_dir = config["model_dir"] self.filter_degree = config["filter_degree"] self.load_from_preprocess = config.get("load_from_preprocess", False) def load_scene_name_list(self): scene_name_list = [] with open(self.split_file_path, "r") as f: for line in f: scene_name = line.strip() scene_name_list.append(scene_name) return scene_name_list def get_datalist(self): datalist = [] for scene_name in self.scene_name_list: seq_num = DataLoadUtil.get_label_num(self.root_dir, scene_name) scene_max_coverage_rate = 0 scene_max_cr_idx = 0 for seq_idx in range(seq_num): label_path = DataLoadUtil.get_label_path(self.root_dir, scene_name, seq_idx) label_data = DataLoadUtil.load_label(label_path) max_coverage_rate = label_data["max_coverage_rate"] if max_coverage_rate > scene_max_coverage_rate: scene_max_coverage_rate = max_coverage_rate scene_max_cr_idx = seq_idx label_path = DataLoadUtil.get_label_path(self.root_dir, scene_name, scene_max_cr_idx) label_data = DataLoadUtil.load_label(label_path) first_frame = label_data["best_sequence"][0] best_seq_len = len(label_data["best_sequence"]) datalist.append({ "scene_name": scene_name, "first_frame": first_frame, "max_coverage_rate": scene_max_coverage_rate, "best_seq_len": best_seq_len, "label_idx": scene_max_cr_idx, }) return datalist def __getitem__(self, index): data_item_info = self.datalist[index] first_frame_idx = data_item_info["first_frame"][0] first_frame_coverage = data_item_info["first_frame"][1] max_coverage_rate = data_item_info["max_coverage_rate"] scene_name = data_item_info["scene_name"] first_cam_info = DataLoadUtil.load_cam_info(DataLoadUtil.get_path(self.root_dir, scene_name, first_frame_idx), binocular=True) first_view_path = DataLoadUtil.get_path(self.root_dir, scene_name, first_frame_idx) first_left_cam_pose = first_cam_info["cam_to_world"] first_center_cam_pose = first_cam_info["cam_to_world_O"] first_target_point_cloud = DataLoadUtil.load_from_preprocessed_pts(first_view_path) first_pts_num = first_target_point_cloud.shape[0] first_downsampled_target_point_cloud = PtsUtil.random_downsample_point_cloud(first_target_point_cloud, self.pts_num) first_to_world_rot_6d = PoseUtil.matrix_to_rotation_6d_numpy(np.asarray(first_left_cam_pose[:3,:3])) first_to_world_trans = first_left_cam_pose[:3,3] first_to_world_9d = np.concatenate([first_to_world_rot_6d, first_to_world_trans], axis=0) diag = DataLoadUtil.get_bbox_diag(self.model_dir, scene_name) voxel_threshold = diag*0.02 first_O_to_first_L_pose = np.dot(np.linalg.inv(first_left_cam_pose), first_center_cam_pose) scene_path = os.path.join(self.root_dir, scene_name) model_points_normals = DataLoadUtil.load_points_normals(self.root_dir, scene_name) data_item = { "first_pts_num": np.asarray( first_pts_num, dtype=np.int32 ), "first_pts": np.asarray([first_downsampled_target_point_cloud],dtype=np.float32), "combined_scanned_pts": np.asarray(first_downsampled_target_point_cloud,dtype=np.float32), "first_to_world_9d": np.asarray([first_to_world_9d],dtype=np.float32), "scene_name": scene_name, "max_coverage_rate": max_coverage_rate, "voxel_threshold": voxel_threshold, "filter_degree": self.filter_degree, "O_to_L_pose": first_O_to_first_L_pose, "first_frame_coverage": first_frame_coverage, "scene_path": scene_path, "model_points_normals": model_points_normals, "best_seq_len": data_item_info["best_seq_len"], "first_frame_id": first_frame_idx, } return data_item def __len__(self): return len(self.datalist) def get_collate_fn(self): def collate_fn(batch): collate_data = {} collate_data["first_pts"] = [torch.tensor(item['first_pts']) for item in batch] collate_data["first_to_world_9d"] = [torch.tensor(item['first_to_world_9d']) for item in batch] collate_data["combined_scanned_pts"] = torch.stack([torch.tensor(item['combined_scanned_pts']) for item in batch]) for key in batch[0].keys(): if key not in ["first_pts", "first_to_world_9d", "combined_scanned_pts"]: collate_data[key] = [item[key] for item in batch] return collate_data return collate_fn # -------------- Debug ---------------- # if __name__ == "__main__": import torch seed = 0 torch.manual_seed(seed) np.random.seed(seed) config = { "root_dir": "/home/data/hofee/project/nbv_rec/data/nbv_rec_data_512_preproc_npy", "split_file": "/home/data/hofee/project/nbv_rec/data/OmniObject3d_train.txt", "model_dir": "/home/data/hofee/project/nbv_rec/data/scaled_object_meshes", "ratio": 0.005, "batch_size": 2, "filter_degree": 75, "num_workers": 0, "pts_num": 32684, "type": namespace.Mode.TEST, "load_from_preprocess": True } ds = SeqNBVReconstructionDataset(config) print(len(ds)) #ds.__getitem__(10) dl = ds.get_loader(shuffle=True) for idx, data in enumerate(dl): data = ds.process_batch(data, "cuda:0") print(data) # ------ Debug Start ------ import ipdb;ipdb.set_trace() # ------ Debug End ------+