import os import json import numpy as np import sys np.random.seed(0) # append parent directory to sys.path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) print(sys.path) from utils.reconstruction import ReconstructionUtil from utils.data_load import DataLoadUtil from utils.pts import PtsUtil def save_np_pts(path, pts: np.ndarray, file_type="txt"): if file_type == "txt": np.savetxt(path, pts) else: np.save(path, pts) def save_full_points(root, scene, frame_idx, full_points: np.ndarray, file_type="txt"): pts_path = os.path.join(root,scene, "scene_pts", f"{frame_idx}.{file_type}") if not os.path.exists(os.path.join(root,scene, "scene_pts")): os.makedirs(os.path.join(root,scene, "scene_pts")) save_np_pts(pts_path, full_points, file_type) def save_target_points(root, scene, frame_idx, target_points: np.ndarray, file_type="txt"): pts_path = os.path.join(root,scene, "target_pts", f"{frame_idx}.{file_type}") if not os.path.exists(os.path.join(root,scene, "target_pts")): os.makedirs(os.path.join(root,scene, "target_pts")) save_np_pts(pts_path, target_points, file_type) def save_mask_idx(root, scene, frame_idx, mask_idx: np.ndarray,filtered_idx, file_type="txt"): indices_path = os.path.join(root,scene, "mask_idx", f"{frame_idx}.{file_type}") if not os.path.exists(os.path.join(root,scene, "mask_idx")): os.makedirs(os.path.join(root,scene, "mask_idx")) save_np_pts(indices_path, mask_idx, file_type) filtered_path = os.path.join(root,scene, "mask_idx", f"{frame_idx}_filtered.{file_type}") save_np_pts(filtered_path, filtered_idx, file_type) def save_scan_points_indices(root, scene, frame_idx, scan_points_indices: np.ndarray, file_type="txt"): indices_path = os.path.join(root,scene, "scan_points_indices", f"{frame_idx}.{file_type}") if not os.path.exists(os.path.join(root,scene, "scan_points_indices")): os.makedirs(os.path.join(root,scene, "scan_points_indices")) save_np_pts(indices_path, scan_points_indices, file_type) def save_scan_points(root, scene, scan_points: np.ndarray): scan_points_path = os.path.join(root,scene, "scan_points.txt") save_np_pts(scan_points_path, scan_points) def get_world_points(depth, cam_intrinsic, cam_extrinsic): h, w = depth.shape i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy") z = depth x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0] y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1] points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) points_camera_aug = np.concatenate((points_camera, np.ones((points_camera.shape[0], 1))), axis=-1) points_camera_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3] return points_camera_world def get_world_normals(normals, cam_extrinsic): normals = normals / np.linalg.norm(normals, axis=1, keepdims=True) normals_world = np.dot(cam_extrinsic[:3, :3], normals.T).T return normals_world def get_scan_points_indices(scan_points, mask, display_table_mask_label, cam_intrinsic, cam_extrinsic): scan_points_homogeneous = np.hstack((scan_points, np.ones((scan_points.shape[0], 1)))) points_camera = np.dot(cam_extrinsic, scan_points_homogeneous.T).T[:, :3] points_image_homogeneous = np.dot(cam_intrinsic, points_camera.T).T points_image_homogeneous /= points_image_homogeneous[:, 2:] pixel_x = points_image_homogeneous[:, 0].astype(int) pixel_y = points_image_homogeneous[:, 1].astype(int) h, w = mask.shape[:2] valid_indices = (pixel_x >= 0) & (pixel_x < w) & (pixel_y >= 0) & (pixel_y < h) mask_colors = mask[pixel_y[valid_indices], pixel_x[valid_indices]] selected_points_indices = mask_colors == display_table_mask_label return selected_points_indices def save_scene_data(root, scene, scene_idx=0, scene_total=1): ''' configuration ''' target_mask_label = (0, 255, 0, 255) display_table_mask_label=(0, 0, 255, 255) random_downsample_N = 65536 train_input_pts_num = 8192 voxel_size=0.002 filter_degree = 75 ''' scan points ''' display_table_info = DataLoadUtil.get_display_table_info(root, scene) radius = display_table_info["radius"] scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius)) ''' read frame data(depth|mask|normal) ''' frame_num = DataLoadUtil.get_scene_seq_length(root, scene) for frame_id in range(frame_num): print(f"[scene({scene_idx}/{scene_total})|frame({frame_id}/{frame_num})]Processing {scene} frame {frame_id}") path = DataLoadUtil.get_path(root, scene, frame_id) cam_info = DataLoadUtil.load_cam_info(path, binocular=True) depth_L, depth_R = DataLoadUtil.load_depth( path, cam_info["near_plane"], cam_info["far_plane"], binocular=True ) mask_L = DataLoadUtil.load_seg(path, binocular=True, left_only=True) normal_L = DataLoadUtil.load_normal(path, binocular=True, left_only=True) ''' scene points ''' scene_points_L = get_world_points(depth_L, cam_info["cam_intrinsic"], cam_info["cam_to_world"]) scene_points_R = get_world_points(depth_R, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"]) sampled_scene_points_L, random_sample_idx_L = PtsUtil.random_downsample_point_cloud( scene_points_L, random_downsample_N, require_idx=True ) sampled_scene_points_R = PtsUtil.random_downsample_point_cloud( scene_points_R, random_downsample_N ) scene_overlap_points, overlap_idx_L = PtsUtil.get_overlapping_points( sampled_scene_points_L, sampled_scene_points_R, voxel_size, require_idx=True ) if scene_overlap_points.shape[0] < 1024: scene_overlap_points = sampled_scene_points_L overlap_idx_L = np.arange(sampled_scene_points_L.shape[0]) train_input_points, train_input_idx = PtsUtil.random_downsample_point_cloud( scene_overlap_points, train_input_pts_num, require_idx=True ) ''' target points ''' mask_img = mask_L mask_L = mask_L.reshape(-1, 4) mask_L = (mask_L == target_mask_label).all(axis=-1) mask_overlap = mask_L[random_sample_idx_L][overlap_idx_L] scene_normals_L = normal_L.reshape(-1, 3) target_overlap_normals = scene_normals_L[random_sample_idx_L][overlap_idx_L][mask_overlap] target_normals = get_world_normals(target_overlap_normals, cam_info["cam_to_world"]) target_points = scene_overlap_points[mask_overlap] filtered_target_points, filtered_idx = PtsUtil.filter_points( target_points, target_normals, cam_info["cam_to_world"], filter_degree, require_idx=True ) ''' train_input_mask ''' mask_train_input = mask_overlap[train_input_idx] ''' scan points indices ''' scan_points_indices = get_scan_points_indices(scan_points, mask_img, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world"]) save_full_points(root, scene, frame_id, train_input_points) save_target_points(root, scene, frame_id, filtered_target_points) save_mask_idx(root, scene, frame_id, mask_train_input, filtered_idx=filtered_idx) save_scan_points_indices(root, scene, frame_id, scan_points_indices) save_scan_points(root, scene, scan_points) # The "done" flag of scene preprocess if __name__ == "__main__": #root = "/media/hofee/repository/new_data_with_normal" root = "/media/hofee/repository/test_sample" list_path = "/media/hofee/repository/test_sample/test_sample_list.txt" scene_list = [] with open(list_path, "r") as f: for line in f: scene_list.append(line.strip()) from_idx = 0 to_idx = len(scene_list) cnt = 0 total = to_idx - from_idx for scene in scene_list[from_idx:to_idx]: save_scene_data(root, scene, cnt, total) cnt+=1