import os import numpy as np import json import cv2 import trimesh from pts import PtsUtil class DataLoadUtil: @staticmethod def get_path(root, scene_name, frame_idx): path = os.path.join(root, scene_name, f"{frame_idx}") return path @staticmethod def get_label_path(root, scene_name): path = os.path.join(root,scene_name, f"label.json") return path @staticmethod def get_sampled_model_points_path(root, scene_name): path = os.path.join(root,scene_name, f"sampled_model_points.txt") return path @staticmethod def get_scene_seq_length(root, scene_name): camera_params_path = os.path.join(root, scene_name, "camera_params") return len(os.listdir(camera_params_path)) @staticmethod def load_downsampled_world_model_points(root, scene_name): model_path = DataLoadUtil.get_sampled_model_points_path(root, scene_name) model_points = np.loadtxt(model_path) return model_points @staticmethod def save_downsampled_world_model_points(root, scene_name, model_points): model_path = DataLoadUtil.get_sampled_model_points_path(root, scene_name) np.savetxt(model_path, model_points) @staticmethod def load_mesh_at(model_dir, object_name, world_object_pose): model_path = os.path.join(model_dir, object_name, "mesh.obj") mesh = trimesh.load(model_path) mesh.apply_transform(world_object_pose) return mesh @staticmethod def save_mesh_at(model_dir, output_dir, object_name, scene_name, world_object_pose): mesh = DataLoadUtil.load_mesh_at(model_dir, object_name, world_object_pose) model_path = os.path.join(output_dir, scene_name, "world_mesh.obj") mesh.export(model_path) @staticmethod def save_target_mesh_at_world_space(root, model_dir, scene_name): scene_info = DataLoadUtil.load_scene_info(root, scene_name) target_name = scene_info["target_name"] transformation = scene_info[target_name] location = transformation["location"] rotation_euler = transformation["rotation_euler"] pose_mat = trimesh.transformations.euler_matrix(*rotation_euler) pose_mat[:3, 3] = location mesh = DataLoadUtil.load_mesh_at(model_dir, target_name, pose_mat) mesh_dir = os.path.join(root, scene_name, "mesh") if not os.path.exists(mesh_dir): os.makedirs(mesh_dir) model_path = os.path.join(mesh_dir, "world_target_mesh.obj") mesh.export(model_path) @staticmethod def load_scene_info(root, scene_name): scene_info_path = os.path.join(root, scene_name, "scene_info.json") with open(scene_info_path, "r") as f: scene_info = json.load(f) return scene_info @staticmethod def load_target_object_pose(root, scene_name): scene_info = DataLoadUtil.load_scene_info(root, scene_name) target_name = scene_info["target_name"] transformation = scene_info[target_name] location = transformation["location"] rotation_euler = transformation["rotation_euler"] pose_mat = trimesh.transformations.euler_matrix(*rotation_euler) pose_mat[:3, 3] = location return pose_mat @staticmethod def load_depth(path, min_depth=0.01,max_depth=5.0,binocular=False): def load_depth_from_real_path(real_path, min_depth, max_depth): depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED) depth = depth.astype(np.float32) / 65535.0 min_depth = min_depth max_depth = max_depth depth_meters = min_depth + (max_depth - min_depth) * depth return depth_meters if binocular: depth_path_L = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_L.png") depth_path_R = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_R.png") depth_meters_L = load_depth_from_real_path(depth_path_L, min_depth, max_depth) depth_meters_R = load_depth_from_real_path(depth_path_R, min_depth, max_depth) return depth_meters_L, depth_meters_R else: depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png") depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth) return depth_meters @staticmethod def load_seg(path, binocular=False): if binocular: def clean_mask(mask_image): green = [0, 255, 0, 255] red = [255, 0, 0, 255] threshold = 2 mask_image = np.where(np.abs(mask_image - green) <= threshold, green, mask_image) mask_image = np.where(np.abs(mask_image - red) <= threshold, red, mask_image) return mask_image mask_path_L = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_L.png") mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED)) mask_path_R = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_R.png") mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED)) return mask_image_L, mask_image_R else: mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png") mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) return mask_image @staticmethod def load_label(path): with open(path, 'r') as f: label_data = json.load(f) return label_data @staticmethod def load_rgb(path): rgb_path = os.path.join(os.path.dirname(path), "rgb", os.path.basename(path) + ".png") rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR) return rgb_image @staticmethod def cam_pose_transformation(cam_pose_before): offset = np.asarray([ [1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]) cam_pose_after = cam_pose_before @ offset return cam_pose_after @staticmethod def load_cam_info(path, binocular=False): camera_params_path = os.path.join(os.path.dirname(path), "camera_params", os.path.basename(path) + ".json") with open(camera_params_path, 'r') as f: label_data = json.load(f) cam_to_world = np.asarray(label_data["extrinsic"]) cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world) cam_intrinsic = np.asarray(label_data["intrinsic"]) cam_info = { "cam_to_world": cam_to_world, "cam_intrinsic": cam_intrinsic, "far_plane": label_data["far_plane"], "near_plane": label_data["near_plane"] } if binocular: cam_to_world_R = np.asarray(label_data["extrinsic_R"]) cam_to_world_R = DataLoadUtil.cam_pose_transformation(cam_to_world_R) cam_info["cam_to_world_R"] = cam_to_world_R return cam_info @staticmethod def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0,255,0,255)): h, w = depth.shape i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy') z = depth x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0] y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1] points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3) mask = mask.reshape(-1,4) target_mask = (mask == target_mask_label).all(axis=-1) target_points_camera = points_camera[target_mask] target_points_camera_aug = np.concatenate([target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1) target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3] return { "points_world": target_points_world, "points_camera": target_points_camera } @staticmethod def get_target_point_cloud_world_from_path(path, binocular=False, random_downsample_N=65536, voxel_size = 0.005, target_mask_label=(0,255,0,255)): cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular) if binocular: depth_L, depth_R = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'], binocular=True) mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True) point_cloud_L = DataLoadUtil.get_target_point_cloud(depth_L, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask_L, target_mask_label)['points_world'] point_cloud_R = DataLoadUtil.get_target_point_cloud(depth_R, cam_info['cam_intrinsic'], cam_info['cam_to_world_R'], mask_R, target_mask_label)['points_world'] point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, random_downsample_N) point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, random_downsample_N) overlap_points = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size) return overlap_points else: depth = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane']) mask = DataLoadUtil.load_seg(path) point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)['points_world'] return point_cloud @staticmethod def voxelize_points(points, voxel_size): voxel_indices = np.floor(points / voxel_size).astype(np.int32) unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True) return unique_voxels @staticmethod def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005): voxels_L, indices_L = DataLoadUtil.voxelize_points(point_cloud_L, voxel_size) voxels_R, _ = DataLoadUtil.voxelize_points(point_cloud_R, voxel_size) voxel_indices_L = voxels_L.view([('', voxels_L.dtype)]*3) voxel_indices_R = voxels_R.view([('', voxels_R.dtype)]*3) overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R) mask_L = np.isin(indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]) overlapping_points = point_cloud_L[mask_L] return overlapping_points @staticmethod def load_points_normals(root, scene_name): points_path = os.path.join(root, scene_name, "points_and_normals.txt") points_normals = np.loadtxt(points_path) return points_normals