diff --git a/runners/strategy_generator.py b/runners/strategy_generator.py index d6c1598..b295302 100644 --- a/runners/strategy_generator.py +++ b/runners/strategy_generator.py @@ -77,28 +77,40 @@ class StrategyGenerator(Runner): model_points_normals = DataLoadUtil.load_points_normals(root, scene_name) model_pts = model_points_normals[:,:3] down_sampled_model_pts = PtsUtil.voxel_downsample_point_cloud(model_pts, voxel_threshold) - + display_table_info = DataLoadUtil.get_display_table_info(root, scene_name) + radius = display_table_info["radius"] + top = DataLoadUtil.get_display_table_top(root, scene_name) + scan_points = ReconstructionUtil.generate_scan_points(display_table_top=top,display_table_radius=radius) pts_list = [] + scan_points_indices_list = [] for frame_idx in range(frame_num): if self.load_pts and os.path.exists(os.path.join(root,scene_name, "pts", f"{frame_idx}.txt")): sampled_point_cloud = np.loadtxt(os.path.join(root,scene_name, "pts", f"{frame_idx}.txt")) + indices = np.loadtxt(os.path.join(root,scene_name, "pts", f"{frame_idx}_indices.txt")).astype(np.int32).tolist() status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_idx, frame_num) pts_list.append(sampled_point_cloud) - continue + scan_points_indices_list.append(indices) + else: path = DataLoadUtil.get_path(root, scene_name, frame_idx) cam_params = DataLoadUtil.load_cam_info(path, binocular=True) status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_idx, frame_num) - point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True) + point_cloud, display_table_pts = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True, get_display_table_pts=True) sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=self.filter_degree) - + covered_pts, indices = ReconstructionUtil.compute_covered_scan_points(scan_points, display_table_pts) if self.save_pts: pts_dir = os.path.join(root,scene_name, "pts") + covered_pts_dir = os.path.join(pts_dir, "covered_scan_pts") if not os.path.exists(pts_dir): os.makedirs(pts_dir) + if not os.path.exists(covered_pts_dir): + os.makedirs(covered_pts_dir) np.savetxt(os.path.join(pts_dir, f"{frame_idx}.txt"), sampled_point_cloud) + np.savetxt(os.path.join(covered_pts_dir, f"{frame_idx}.txt"), covered_pts) + np.savetxt(os.path.join(pts_dir, f"{frame_idx}_indices.txt"), indices) pts_list.append(sampled_point_cloud) + scan_points_indices_list.append(indices) status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_num, frame_num) seq_num = min(self.seq_num, len(pts_list)) diff --git a/utils/data_load.py b/utils/data_load.py index 5cebfd0..318a6d5 100644 --- a/utils/data_load.py +++ b/utils/data_load.py @@ -6,56 +6,61 @@ import trimesh import torch from utils.pts import PtsUtil + class DataLoadUtil: - TABLE_POSITION = np.asarray([0,0,0.8215]) + TABLE_POSITION = np.asarray([0, 0, 0.8215]) @staticmethod def get_display_table_info(root, scene_name): scene_info = DataLoadUtil.load_scene_info(root, scene_name) display_table_info = scene_info["display_table"] return display_table_info - + @staticmethod def get_display_table_top(root, scene_name): - display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)["height"] - display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray([0,0,display_table_height]) + display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)[ + "height" + ] + display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray( + [0, 0, display_table_height] + ) return display_table_top - + @staticmethod def get_path(root, scene_name, frame_idx): path = os.path.join(root, scene_name, f"{frame_idx}") return path - + @staticmethod def get_label_num(root, scene_name): - label_dir = os.path.join(root,scene_name,"label") + label_dir = os.path.join(root, scene_name, "label") return len(os.listdir(label_dir)) - + @staticmethod def get_label_path(root, scene_name, seq_idx): - label_dir = os.path.join(root,scene_name,"label") + label_dir = os.path.join(root, scene_name, "label") if not os.path.exists(label_dir): os.makedirs(label_dir) - path = os.path.join(label_dir,f"{seq_idx}.json") + path = os.path.join(label_dir, f"{seq_idx}.json") return path - + @staticmethod def get_label_path_old(root, scene_name): - path = os.path.join(root,scene_name,"label.json") + path = os.path.join(root, scene_name, "label.json") return path - + @staticmethod def get_scene_seq_length(root, scene_name): camera_params_path = os.path.join(root, scene_name, "camera_params") return len(os.listdir(camera_params_path)) - + @staticmethod def load_mesh_at(model_dir, object_name, world_object_pose): model_path = os.path.join(model_dir, object_name, "mesh.obj") mesh = trimesh.load(model_path) mesh.apply_transform(world_object_pose) return mesh - + @staticmethod def get_bbox_diag(model_dir, object_name): model_path = os.path.join(model_dir, object_name, "mesh.obj") @@ -63,8 +68,7 @@ class DataLoadUtil: bbox = mesh.bounding_box.extents diagonal_length = np.linalg.norm(bbox) return diagonal_length - - + @staticmethod def save_mesh_at(model_dir, output_dir, object_name, scene_name, world_object_pose): mesh = DataLoadUtil.load_mesh_at(model_dir, object_name, world_object_pose) @@ -72,12 +76,16 @@ class DataLoadUtil: mesh.export(model_path) @staticmethod - def save_target_mesh_at_world_space(root, model_dir, scene_name, display_table_as_world_space_origin=True): + def save_target_mesh_at_world_space( + root, model_dir, scene_name, display_table_as_world_space_origin=True + ): scene_info = DataLoadUtil.load_scene_info(root, scene_name) target_name = scene_info["target_name"] transformation = scene_info[target_name] if display_table_as_world_space_origin: - location = transformation["location"] - DataLoadUtil.get_display_table_top(root, scene_name) + location = transformation["location"] - DataLoadUtil.get_display_table_top( + root, scene_name + ) else: location = transformation["location"] rotation_euler = transformation["rotation_euler"] @@ -90,21 +98,21 @@ class DataLoadUtil: os.makedirs(mesh_dir) model_path = os.path.join(mesh_dir, "world_target_mesh.obj") mesh.export(model_path) - + @staticmethod def load_scene_info(root, scene_name): scene_info_path = os.path.join(root, scene_name, "scene_info.json") with open(scene_info_path, "r") as f: scene_info = json.load(f) return scene_info - + @staticmethod def load_target_pts_num_dict(root, scene_name): target_pts_num_path = os.path.join(root, scene_name, "target_pts_num.json") with open(target_pts_num_path, "r") as f: target_pts_num_dict = json.load(f) return target_pts_num_dict - + @staticmethod def load_target_object_pose(root, scene_name): scene_info = DataLoadUtil.load_scene_info(root, scene_name) @@ -115,10 +123,10 @@ class DataLoadUtil: pose_mat = trimesh.transformations.euler_matrix(*rotation_euler) pose_mat[:3, 3] = location return pose_mat - + @staticmethod - def load_depth(path, min_depth=0.01,max_depth=5.0,binocular=False): - + def load_depth(path, min_depth=0.01, max_depth=5.0, binocular=False): + def load_depth_from_real_path(real_path, min_depth, max_depth): depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED) depth = depth.astype(np.float32) / 65535.0 @@ -126,78 +134,104 @@ class DataLoadUtil: max_depth = max_depth depth_meters = min_depth + (max_depth - min_depth) * depth return depth_meters - + if binocular: - depth_path_L = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_L.png") - depth_path_R = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_R.png") - depth_meters_L = load_depth_from_real_path(depth_path_L, min_depth, max_depth) - depth_meters_R = load_depth_from_real_path(depth_path_R, min_depth, max_depth) + depth_path_L = os.path.join( + os.path.dirname(path), "depth", os.path.basename(path) + "_L.png" + ) + depth_path_R = os.path.join( + os.path.dirname(path), "depth", os.path.basename(path) + "_R.png" + ) + depth_meters_L = load_depth_from_real_path( + depth_path_L, min_depth, max_depth + ) + depth_meters_R = load_depth_from_real_path( + depth_path_R, min_depth, max_depth + ) return depth_meters_L, depth_meters_R else: - depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png") + depth_path = os.path.join( + os.path.dirname(path), "depth", os.path.basename(path) + ".png" + ) depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth) return depth_meters - + @staticmethod def load_seg(path, binocular=False): if binocular: + def clean_mask(mask_image): green = [0, 255, 0, 255] red = [255, 0, 0, 255] threshold = 2 - mask_image = np.where(np.abs(mask_image - green) <= threshold, green, mask_image) - mask_image = np.where(np.abs(mask_image - red) <= threshold, red, mask_image) + mask_image = np.where( + np.abs(mask_image - green) <= threshold, green, mask_image + ) + mask_image = np.where( + np.abs(mask_image - red) <= threshold, red, mask_image + ) return mask_image - mask_path_L = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_L.png") + + mask_path_L = os.path.join( + os.path.dirname(path), "mask", os.path.basename(path) + "_L.png" + ) mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED)) - mask_path_R = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_R.png") + mask_path_R = os.path.join( + os.path.dirname(path), "mask", os.path.basename(path) + "_R.png" + ) mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED)) return mask_image_L, mask_image_R else: - mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png") + mask_path = os.path.join( + os.path.dirname(path), "mask", os.path.basename(path) + ".png" + ) mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) return mask_image - + @staticmethod def load_label(path): - with open(path, 'r') as f: + with open(path, "r") as f: label_data = json.load(f) return label_data - + @staticmethod def load_rgb(path): - rgb_path = os.path.join(os.path.dirname(path), "rgb", os.path.basename(path) + ".png") + rgb_path = os.path.join( + os.path.dirname(path), "rgb", os.path.basename(path) + ".png" + ) rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR) return rgb_image - + @staticmethod def load_from_preprocessed_pts(path): - npy_path = os.path.join(os.path.dirname(path), "points", os.path.basename(path) + ".npy") + npy_path = os.path.join( + os.path.dirname(path), "points", os.path.basename(path) + ".npy" + ) pts = np.load(npy_path) return pts @staticmethod def cam_pose_transformation(cam_pose_before): - offset = np.asarray([ - [1, 0, 0, 0], - [0, -1, 0, 0], - [0, 0, -1, 0], - [0, 0, 0, 1]]) - cam_pose_after = cam_pose_before @ offset + offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]) + cam_pose_after = cam_pose_before @ offset return cam_pose_after - + @staticmethod def load_cam_info(path, binocular=False, display_table_as_world_space_origin=True): scene_dir = os.path.dirname(path) root_dir = os.path.dirname(scene_dir) scene_name = os.path.basename(scene_dir) - camera_params_path = os.path.join(os.path.dirname(path), "camera_params", os.path.basename(path) + ".json") - with open(camera_params_path, 'r') as f: + camera_params_path = os.path.join( + os.path.dirname(path), "camera_params", os.path.basename(path) + ".json" + ) + with open(camera_params_path, "r") as f: label_data = json.load(f) cam_to_world = np.asarray(label_data["extrinsic"]) cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world) world_to_display_table = np.eye(4) - world_to_display_table[:3, 3] = - DataLoadUtil.get_display_table_top(root_dir, scene_name) + world_to_display_table[:3, 3] = -DataLoadUtil.get_display_table_top( + root_dir, scene_name + ) if display_table_as_world_space_origin: cam_to_world = np.dot(world_to_display_table, cam_to_world) cam_intrinsic = np.asarray(label_data["intrinsic"]) @@ -205,7 +239,7 @@ class DataLoadUtil: "cam_to_world": cam_to_world, "cam_intrinsic": cam_intrinsic, "far_plane": label_data["far_plane"], - "near_plane": label_data["near_plane"] + "near_plane": label_data["near_plane"], } if binocular: cam_to_world_R = np.asarray(label_data["extrinsic_R"]) @@ -218,104 +252,165 @@ class DataLoadUtil: cam_info["cam_to_world_O"] = cam_to_world_O cam_info["cam_to_world_R"] = cam_to_world_R return cam_info - + @staticmethod - def get_real_cam_O_from_cam_L(cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True): + def get_real_cam_O_from_cam_L( + cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True + ): root_dir = os.path.dirname(scene_path) scene_name = os.path.basename(scene_path) if isinstance(cam_L, torch.Tensor): cam_L = cam_L.cpu().numpy() - nO_to_display_table_pose = cam_L @ cam_O_to_cam_L + nO_to_display_table_pose = cam_L @ cam_O_to_cam_L if display_table_as_world_space_origin: display_table_to_world = np.eye(4) - display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top(root_dir, scene_name) + display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top( + root_dir, scene_name + ) nO_to_world_pose = np.dot(display_table_to_world, nO_to_display_table_pose) nO_to_world_pose = DataLoadUtil.cam_pose_transformation(nO_to_world_pose) return nO_to_world_pose - + @staticmethod - def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0,255,0,255)): + def get_target_point_cloud( + depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0, 255, 0, 255) + ): h, w = depth.shape - i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy') - + i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy") + z = depth x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0] y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1] - - points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) - mask = mask.reshape(-1,4) - target_mask = (mask == target_mask_label).all(axis=-1) - + points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) + mask = mask.reshape(-1, 4) + + target_mask = (mask == target_mask_label).all(axis=-1) + target_points_camera = points_camera[target_mask] - target_points_camera_aug = np.concatenate([target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1) - + target_points_camera_aug = np.concatenate( + [target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1 + ) + target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3] return { "points_world": target_points_world, - "points_camera": target_points_camera + "points_camera": target_points_camera, } - + @staticmethod def get_point_cloud(depth, cam_intrinsic, cam_extrinsic): h, w = depth.shape - i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy') - + i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy") + z = depth x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0] y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1] - + points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) - points_camera_aug = np.concatenate([points_camera, np.ones((points_camera.shape[0], 1))], axis=-1) - + points_camera_aug = np.concatenate( + [points_camera, np.ones((points_camera.shape[0], 1))], axis=-1 + ) + points_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3] - return { - "points_world": points_world, - "points_camera": points_camera - } - + return {"points_world": points_world, "points_camera": points_camera} + @staticmethod - def get_target_point_cloud_world_from_path(path, binocular=False, random_downsample_N=65536, voxel_size = 0.005, target_mask_label=(0,255,0,255)): + def get_target_point_cloud_world_from_path( + path, + binocular=False, + random_downsample_N=65536, + voxel_size=0.005, + target_mask_label=(0, 255, 0, 255), + display_table_mask_label=(255, 0, 0, 255), + get_display_table_pts=False + ): cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular) if binocular: - depth_L, depth_R = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'], binocular=True) + depth_L, depth_R = DataLoadUtil.load_depth( + path, cam_info["near_plane"], cam_info["far_plane"], binocular=True + ) mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True) - point_cloud_L = DataLoadUtil.get_target_point_cloud(depth_L, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask_L, target_mask_label)['points_world'] - point_cloud_R = DataLoadUtil.get_target_point_cloud(depth_R, cam_info['cam_intrinsic'], cam_info['cam_to_world_R'], mask_R, target_mask_label)['points_world'] - point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, random_downsample_N) - point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, random_downsample_N) - overlap_points = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size) + point_cloud_L = DataLoadUtil.get_target_point_cloud( + depth_L, + cam_info["cam_intrinsic"], + cam_info["cam_to_world"], + mask_L, + target_mask_label, + )["points_world"] + point_cloud_R = DataLoadUtil.get_target_point_cloud( + depth_R, + cam_info["cam_intrinsic"], + cam_info["cam_to_world_R"], + mask_R, + target_mask_label, + )["points_world"] + point_cloud_L = PtsUtil.random_downsample_point_cloud( + point_cloud_L, random_downsample_N + ) + point_cloud_R = PtsUtil.random_downsample_point_cloud( + point_cloud_R, random_downsample_N + ) + overlap_points = DataLoadUtil.get_overlapping_points( + point_cloud_L, point_cloud_R, voxel_size + ) + if get_display_table_pts: + display_pts_L = DataLoadUtil.get_target_point_cloud( + depth_L, + cam_info["cam_intrinsic"], + cam_info["cam_to_world"], + mask_L, + display_table_mask_label, + )["points_world"] + display_pts_R = DataLoadUtil.get_target_point_cloud( + depth_R, + cam_info["cam_intrinsic"], + cam_info["cam_to_world_R"], + mask_R, + display_table_mask_label, + )["points_world"] + display_pts_overlap = DataLoadUtil.get_overlapping_points( + display_pts_L, display_pts_R, voxel_size + ) + return overlap_points, display_pts_overlap return overlap_points else: - depth = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane']) + depth = DataLoadUtil.load_depth( + path, cam_info["near_plane"], cam_info["far_plane"] + ) mask = DataLoadUtil.load_seg(path) - point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)['points_world'] + point_cloud = DataLoadUtil.get_target_point_cloud( + depth, cam_info["cam_intrinsic"], cam_info["cam_to_world"], mask + )["points_world"] return point_cloud - - + @staticmethod def voxelize_points(points, voxel_size): - + voxel_indices = np.floor(points / voxel_size).astype(np.int32) unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True) return unique_voxels - + @staticmethod def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005): voxels_L, indices_L = DataLoadUtil.voxelize_points(point_cloud_L, voxel_size) voxels_R, _ = DataLoadUtil.voxelize_points(point_cloud_R, voxel_size) - voxel_indices_L = voxels_L.view([('', voxels_L.dtype)]*3) - voxel_indices_R = voxels_R.view([('', voxels_R.dtype)]*3) + voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3) + voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3) overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R) - mask_L = np.isin(indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]) + mask_L = np.isin( + indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0] + ) overlapping_points = point_cloud_L[mask_L] return overlapping_points - + @staticmethod def load_points_normals(root, scene_name, display_table_as_world_space_origin=True): points_path = os.path.join(root, scene_name, "points_and_normals.txt") points_normals = np.loadtxt(points_path) if display_table_as_world_space_origin: - points_normals[:,:3] = points_normals[:,:3] - DataLoadUtil.get_display_table_top(root, scene_name) - return points_normals \ No newline at end of file + points_normals[:, :3] = points_normals[ + :, :3 + ] - DataLoadUtil.get_display_table_top(root, scene_name) + return points_normals