411 lines
16 KiB
Python
411 lines
16 KiB
Python
import os
|
|
import numpy as np
|
|
import json
|
|
import cv2
|
|
import trimesh
|
|
import torch
|
|
from pts import PtsUtil
|
|
|
|
|
|
class DataLoadUtil:
|
|
TABLE_POSITION = np.asarray([0, 0, 0.8215])
|
|
|
|
@staticmethod
|
|
def get_display_table_info(root, scene_name):
|
|
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
|
|
display_table_info = scene_info["display_table"]
|
|
return display_table_info
|
|
|
|
@staticmethod
|
|
def get_display_table_top(root, scene_name):
|
|
display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)[
|
|
"height"
|
|
]
|
|
display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray(
|
|
[0, 0, display_table_height]
|
|
)
|
|
return display_table_top
|
|
|
|
@staticmethod
|
|
def get_path(root, scene_name, frame_idx):
|
|
path = os.path.join(root, scene_name, f"{frame_idx}")
|
|
return path
|
|
|
|
@staticmethod
|
|
def get_label_num(root, scene_name):
|
|
label_dir = os.path.join(root, scene_name, "label")
|
|
return len(os.listdir(label_dir))
|
|
|
|
@staticmethod
|
|
def get_label_path(root, scene_name, seq_idx):
|
|
label_dir = os.path.join(root, scene_name, "label")
|
|
if not os.path.exists(label_dir):
|
|
os.makedirs(label_dir)
|
|
path = os.path.join(label_dir, f"{seq_idx}.json")
|
|
return path
|
|
|
|
@staticmethod
|
|
def get_label_path_old(root, scene_name):
|
|
path = os.path.join(root, scene_name, "label.json")
|
|
return path
|
|
|
|
@staticmethod
|
|
def get_scene_seq_length(root, scene_name):
|
|
camera_params_path = os.path.join(root, scene_name, "camera_params")
|
|
return len(os.listdir(camera_params_path))
|
|
|
|
@staticmethod
|
|
def load_mesh_at(model_dir, object_name, world_object_pose):
|
|
model_path = os.path.join(model_dir, object_name, "mesh.obj")
|
|
mesh = trimesh.load(model_path)
|
|
mesh.apply_transform(world_object_pose)
|
|
return mesh
|
|
|
|
@staticmethod
|
|
def get_bbox_diag(model_dir, object_name):
|
|
model_path = os.path.join(model_dir, object_name, "mesh.obj")
|
|
mesh = trimesh.load(model_path)
|
|
bbox = mesh.bounding_box.extents
|
|
diagonal_length = np.linalg.norm(bbox)
|
|
return diagonal_length
|
|
|
|
@staticmethod
|
|
def save_mesh_at(model_dir, output_dir, object_name, scene_name, world_object_pose):
|
|
mesh = DataLoadUtil.load_mesh_at(model_dir, object_name, world_object_pose)
|
|
model_path = os.path.join(output_dir, scene_name, "world_mesh.obj")
|
|
mesh.export(model_path)
|
|
|
|
@staticmethod
|
|
def save_target_mesh_at_world_space(
|
|
root, model_dir, scene_name, display_table_as_world_space_origin=True
|
|
):
|
|
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
|
|
target_name = scene_info["target_name"]
|
|
transformation = scene_info[target_name]
|
|
if display_table_as_world_space_origin:
|
|
location = transformation["location"] - DataLoadUtil.get_display_table_top(
|
|
root, scene_name
|
|
)
|
|
else:
|
|
location = transformation["location"]
|
|
rotation_euler = transformation["rotation_euler"]
|
|
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
|
|
pose_mat[:3, 3] = location
|
|
|
|
mesh = DataLoadUtil.load_mesh_at(model_dir, target_name, pose_mat)
|
|
mesh_dir = os.path.join(root, scene_name, "mesh")
|
|
if not os.path.exists(mesh_dir):
|
|
os.makedirs(mesh_dir)
|
|
model_path = os.path.join(mesh_dir, "world_target_mesh.obj")
|
|
mesh.export(model_path)
|
|
|
|
@staticmethod
|
|
def load_scene_info(root, scene_name):
|
|
scene_info_path = os.path.join(root, scene_name, "scene_info.json")
|
|
with open(scene_info_path, "r") as f:
|
|
scene_info = json.load(f)
|
|
return scene_info
|
|
|
|
@staticmethod
|
|
def load_target_pts_num_dict(root, scene_name):
|
|
target_pts_num_path = os.path.join(root, scene_name, "target_pts_num.json")
|
|
with open(target_pts_num_path, "r") as f:
|
|
target_pts_num_dict = json.load(f)
|
|
return target_pts_num_dict
|
|
|
|
@staticmethod
|
|
def load_target_object_pose(root, scene_name):
|
|
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
|
|
target_name = scene_info["target_name"]
|
|
transformation = scene_info[target_name]
|
|
location = transformation["location"]
|
|
rotation_euler = transformation["rotation_euler"]
|
|
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
|
|
pose_mat[:3, 3] = location
|
|
return pose_mat
|
|
|
|
@staticmethod
|
|
def load_depth(path, min_depth=0.01, max_depth=5.0, binocular=False):
|
|
|
|
def load_depth_from_real_path(real_path, min_depth, max_depth):
|
|
depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED)
|
|
depth = depth.astype(np.float32) / 65535.0
|
|
min_depth = min_depth
|
|
max_depth = max_depth
|
|
depth_meters = min_depth + (max_depth - min_depth) * depth
|
|
return depth_meters
|
|
|
|
if binocular:
|
|
depth_path_L = os.path.join(
|
|
os.path.dirname(path), "depth", os.path.basename(path) + "_L.png"
|
|
)
|
|
depth_path_R = os.path.join(
|
|
os.path.dirname(path), "depth", os.path.basename(path) + "_R.png"
|
|
)
|
|
depth_meters_L = load_depth_from_real_path(
|
|
depth_path_L, min_depth, max_depth
|
|
)
|
|
depth_meters_R = load_depth_from_real_path(
|
|
depth_path_R, min_depth, max_depth
|
|
)
|
|
return depth_meters_L, depth_meters_R
|
|
else:
|
|
depth_path = os.path.join(
|
|
os.path.dirname(path), "depth", os.path.basename(path) + ".png"
|
|
)
|
|
depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth)
|
|
return depth_meters
|
|
|
|
@staticmethod
|
|
def load_seg(path, binocular=False, left_only=False):
|
|
if binocular and not left_only:
|
|
|
|
def clean_mask(mask_image):
|
|
green = [0, 255, 0, 255]
|
|
red = [255, 0, 0, 255]
|
|
threshold = 2
|
|
mask_image = np.where(
|
|
np.abs(mask_image - green) <= threshold, green, mask_image
|
|
)
|
|
mask_image = np.where(
|
|
np.abs(mask_image - red) <= threshold, red, mask_image
|
|
)
|
|
return mask_image
|
|
|
|
mask_path_L = os.path.join(
|
|
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
|
|
)
|
|
mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED))
|
|
mask_path_R = os.path.join(
|
|
os.path.dirname(path), "mask", os.path.basename(path) + "_R.png"
|
|
)
|
|
mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED))
|
|
return mask_image_L, mask_image_R
|
|
else:
|
|
if binocular and left_only:
|
|
mask_path = os.path.join(
|
|
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
|
|
)
|
|
else:
|
|
mask_path = os.path.join(
|
|
os.path.dirname(path), "mask", os.path.basename(path) + ".png"
|
|
)
|
|
mask_image = cv2.imread(mask_path, cv2.IMREAD_UNCHANGED)
|
|
return mask_image
|
|
|
|
@staticmethod
|
|
def load_normal(path, binocular=False, left_only=False):
|
|
if binocular and not left_only:
|
|
normal_path_L = os.path.join(
|
|
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
|
|
)
|
|
normal_image_L = cv2.imread(normal_path_L, cv2.IMREAD_COLOR)
|
|
normal_path_R = os.path.join(
|
|
os.path.dirname(path), "normal", os.path.basename(path) + "_R.png"
|
|
)
|
|
normal_image_R = cv2.imread(normal_path_R, cv2.IMREAD_COLOR)
|
|
normalized_normal_image_L = normal_image_L / 255.0 * 2.0 - 1.0
|
|
normalized_normal_image_R = normal_image_R / 255.0 * 2.0 - 1.0
|
|
return normalized_normal_image_L, normalized_normal_image_R
|
|
else:
|
|
if binocular and left_only:
|
|
normal_path = os.path.join(
|
|
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
|
|
)
|
|
else:
|
|
normal_path = os.path.join(
|
|
os.path.dirname(path), "normal", os.path.basename(path) + ".png"
|
|
)
|
|
normal_image = cv2.imread(normal_path, cv2.IMREAD_COLOR)
|
|
normalized_normal_image = normal_image / 255.0 * 2.0 - 1.0
|
|
return normalized_normal_image
|
|
|
|
@staticmethod
|
|
def load_label(path):
|
|
with open(path, "r") as f:
|
|
label_data = json.load(f)
|
|
return label_data
|
|
|
|
@staticmethod
|
|
def load_rgb(path):
|
|
rgb_path = os.path.join(
|
|
os.path.dirname(path), "rgb", os.path.basename(path) + ".png"
|
|
)
|
|
rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR)
|
|
return rgb_image
|
|
|
|
@staticmethod
|
|
def load_from_preprocessed_pts(path):
|
|
npy_path = os.path.join(
|
|
os.path.dirname(path), "pts", os.path.basename(path) + ".npy"
|
|
)
|
|
pts = np.load(npy_path)
|
|
return pts
|
|
|
|
@staticmethod
|
|
def cam_pose_transformation(cam_pose_before):
|
|
offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
|
|
cam_pose_after = cam_pose_before @ offset
|
|
return cam_pose_after
|
|
|
|
@staticmethod
|
|
def load_cam_info(path, binocular=False, display_table_as_world_space_origin=True):
|
|
scene_dir = os.path.dirname(path)
|
|
root_dir = os.path.dirname(scene_dir)
|
|
scene_name = os.path.basename(scene_dir)
|
|
camera_params_path = os.path.join(
|
|
os.path.dirname(path), "camera_params", os.path.basename(path) + ".json"
|
|
)
|
|
with open(camera_params_path, "r") as f:
|
|
label_data = json.load(f)
|
|
cam_to_world = np.asarray(label_data["extrinsic"])
|
|
cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world)
|
|
world_to_display_table = np.eye(4)
|
|
world_to_display_table[:3, 3] = -DataLoadUtil.get_display_table_top(
|
|
root_dir, scene_name
|
|
)
|
|
if display_table_as_world_space_origin:
|
|
cam_to_world = np.dot(world_to_display_table, cam_to_world)
|
|
cam_intrinsic = np.asarray(label_data["intrinsic"])
|
|
cam_info = {
|
|
"cam_to_world": cam_to_world,
|
|
"cam_intrinsic": cam_intrinsic,
|
|
"far_plane": label_data["far_plane"],
|
|
"near_plane": label_data["near_plane"],
|
|
}
|
|
if binocular:
|
|
cam_to_world_R = np.asarray(label_data["extrinsic_R"])
|
|
cam_to_world_R = DataLoadUtil.cam_pose_transformation(cam_to_world_R)
|
|
cam_to_world_O = np.asarray(label_data["extrinsic_cam_object"])
|
|
cam_to_world_O = DataLoadUtil.cam_pose_transformation(cam_to_world_O)
|
|
if display_table_as_world_space_origin:
|
|
cam_to_world_O = np.dot(world_to_display_table, cam_to_world_O)
|
|
cam_to_world_R = np.dot(world_to_display_table, cam_to_world_R)
|
|
cam_info["cam_to_world_O"] = cam_to_world_O
|
|
cam_info["cam_to_world_R"] = cam_to_world_R
|
|
return cam_info
|
|
|
|
@staticmethod
|
|
def get_real_cam_O_from_cam_L(
|
|
cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True
|
|
):
|
|
root_dir = os.path.dirname(scene_path)
|
|
scene_name = os.path.basename(scene_path)
|
|
if isinstance(cam_L, torch.Tensor):
|
|
cam_L = cam_L.cpu().numpy()
|
|
nO_to_display_table_pose = cam_L @ cam_O_to_cam_L
|
|
if display_table_as_world_space_origin:
|
|
display_table_to_world = np.eye(4)
|
|
display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top(
|
|
root_dir, scene_name
|
|
)
|
|
nO_to_world_pose = np.dot(display_table_to_world, nO_to_display_table_pose)
|
|
nO_to_world_pose = DataLoadUtil.cam_pose_transformation(nO_to_world_pose)
|
|
return nO_to_world_pose
|
|
|
|
@staticmethod
|
|
def get_target_point_cloud(
|
|
depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0, 255, 0, 255), require_full_points=False
|
|
):
|
|
h, w = depth.shape
|
|
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
|
|
|
|
z = depth
|
|
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
|
|
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
|
|
|
|
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
|
|
mask = mask.reshape(-1, 4)
|
|
|
|
target_mask = (mask == target_mask_label).all(axis=-1)
|
|
|
|
target_points_camera = points_camera[target_mask]
|
|
target_points_camera_aug = np.concatenate(
|
|
[target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1
|
|
)
|
|
|
|
target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3]
|
|
data = {
|
|
"points_world": target_points_world,
|
|
"points_camera": target_points_camera,
|
|
}
|
|
return data
|
|
|
|
@staticmethod
|
|
def get_point_cloud(depth, cam_intrinsic, cam_extrinsic):
|
|
h, w = depth.shape
|
|
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
|
|
|
|
z = depth
|
|
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
|
|
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
|
|
|
|
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
|
|
points_camera_aug = np.concatenate(
|
|
[points_camera, np.ones((points_camera.shape[0], 1))], axis=-1
|
|
)
|
|
|
|
points_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3]
|
|
return {"points_world": points_world, "points_camera": points_camera}
|
|
|
|
@staticmethod
|
|
def get_target_point_cloud_world_from_path(
|
|
path,
|
|
binocular=False,
|
|
random_downsample_N=65536,
|
|
voxel_size=0.005,
|
|
target_mask_label=(0, 255, 0, 255),
|
|
display_table_mask_label=(0, 0, 255, 255),
|
|
get_display_table_pts=False,
|
|
require_normal=False,
|
|
):
|
|
cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular)
|
|
if binocular:
|
|
depth_L, depth_R = DataLoadUtil.load_depth(
|
|
path, cam_info["near_plane"], cam_info["far_plane"], binocular=True
|
|
)
|
|
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
|
|
point_cloud_L = DataLoadUtil.get_target_point_cloud(
|
|
depth_L,
|
|
cam_info["cam_intrinsic"],
|
|
cam_info["cam_to_world"],
|
|
mask_L,
|
|
target_mask_label,
|
|
)["points_world"]
|
|
point_cloud_R = DataLoadUtil.get_target_point_cloud(
|
|
depth_R,
|
|
cam_info["cam_intrinsic"],
|
|
cam_info["cam_to_world_R"],
|
|
mask_R,
|
|
target_mask_label,
|
|
)["points_world"]
|
|
point_cloud_L = PtsUtil.random_downsample_point_cloud(
|
|
point_cloud_L, random_downsample_N
|
|
)
|
|
point_cloud_R = PtsUtil.random_downsample_point_cloud(
|
|
point_cloud_R, random_downsample_N
|
|
)
|
|
overlap_points = PtsUtil.get_overlapping_points(
|
|
point_cloud_L, point_cloud_R, voxel_size
|
|
)
|
|
return overlap_points
|
|
else:
|
|
depth = DataLoadUtil.load_depth(
|
|
path, cam_info["near_plane"], cam_info["far_plane"]
|
|
)
|
|
mask = DataLoadUtil.load_seg(path)
|
|
point_cloud = DataLoadUtil.get_target_point_cloud(
|
|
depth, cam_info["cam_intrinsic"], cam_info["cam_to_world"], mask
|
|
)["points_world"]
|
|
return point_cloud
|
|
|
|
@staticmethod
|
|
def load_points_normals(root, scene_name, display_table_as_world_space_origin=True):
|
|
points_path = os.path.join(root, scene_name, "points_and_normals.txt")
|
|
points_normals = np.loadtxt(points_path)
|
|
if display_table_as_world_space_origin:
|
|
points_normals[:, :3] = points_normals[
|
|
:, :3
|
|
] - DataLoadUtil.get_display_table_top(root, scene_name)
|
|
return points_normals
|