This commit is contained in:
hofee 2024-10-04 16:35:26 +00:00
commit ee7537c315
16 changed files with 845 additions and 409 deletions

22
TODO.md Normal file
View File

@ -0,0 +1,22 @@
# TODO
## 预处理数据
### 1. 生成view阶段
**input**: 物体mesh
### 2. 生成label阶段
**input**: 目标物体点云、目标物体点云法线、桌面扫描点、被拍到的桌面扫描点
**可以删掉的数据**: mask、normal
### 3. 训练阶段
**input**: 完整点云、pose、label
**可以删掉的数据**depth
### view生成后
预处理目标物体点云、目标物体点云法线、桌面扫描点、被拍到的桌面扫描点、完整点云
删除depth、mask、normal
### label生成后
只上传完整点云、pose、label

View File

@ -12,7 +12,8 @@ runner:
generate:
voxel_threshold: 0.01
overlap_threshold: 0.5
soft_overlap_threshold: 0.3
hard_overlap_threshold: 0.6
filter_degree: 75
to_specified_dir: True # if True, output_dir is used, otherwise, root_dir is used
save_points: True
@ -20,15 +21,17 @@ runner:
save_best_combined_points: False
save_mesh: True
overwrite: False
seq_num: 50
seq_num: 10
dataset_list:
- OmniObject3d
datasets:
OmniObject3d:
#"/media/hofee/data/data/temp_output"
root_dir: "/media/hofee/data/data/sample_data/view_data"
root_dir: "/media/hofee/repository/new_full_box_data"
model_dir: "/media/hofee/data/data/scaled_object_meshes"
from: 0
to: -1 # -1 means end
#output_dir: "/media/hofee/data/data/label_output"

View File

@ -7,13 +7,18 @@ runner:
name: debug
root_dir: experiments
generate:
port: 5002
from: 2200
to: 2300 # -1 means all
object_dir: /media/hofee/data/data/scaled_object_meshes
table_model_path: /media/hofee/data/data/others/table.obj
output_dir: /media/hofee/repository/new_nbv_reconstruction_data_512
output_dir: /media/hofee/repository/new_data_with_normal
binocular_vision: true
plane_size: 10
max_views: 512
min_views: 64
min_views: 128
random_view_ratio: 0.2
min_cam_table_included_degree: 20
max_diag: 0.7
min_diag: 0.1
random_config:
@ -22,12 +27,6 @@ runner:
max_height: 0.15
min_radius: 0.3
max_radius: 0.5
min_R: 0.05
max_R: 0.3
min_G: 0.05
max_G: 0.3
min_B: 0.05
max_B: 0.3
display_object:
min_x: 0
max_x: 0.03

View File

@ -0,0 +1,100 @@
import torch
from torch import nn
import PytorchBoot.namespace as namespace
import PytorchBoot.stereotype as stereotype
from PytorchBoot.factory.component_factory import ComponentFactory
from PytorchBoot.utils import Log
@stereotype.pipeline("nbv_reconstruction_global_pts_n_num_pipeline")
class NBVReconstructionGlobalPointsPipeline(nn.Module):
def __init__(self, config):
super(NBVReconstructionGlobalPointsPipeline, self).__init__()
self.config = config
self.module_config = config["modules"]
self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"])
self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"])
self.pose_n_num_seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_n_num_seq_encoder"])
self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"])
self.pts_num_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_num_encoder"])
self.eps = float(self.config["eps"])
self.enable_global_scanned_feat = self.config["global_scanned_feat"]
def forward(self, data):
mode = data["mode"]
if mode == namespace.Mode.TRAIN:
return self.forward_train(data)
elif mode == namespace.Mode.TEST:
return self.forward_test(data)
else:
Log.error("Unknown mode: {}".format(mode), True)
def pertube_data(self, gt_delta_9d):
bs = gt_delta_9d.shape[0]
random_t = torch.rand(bs, device=gt_delta_9d.device) * (1. - self.eps) + self.eps
random_t = random_t.unsqueeze(-1)
mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t)
std = std.view(-1, 1)
z = torch.randn_like(gt_delta_9d)
perturbed_x = mu + z * std
target_score = - z * std / (std ** 2)
return perturbed_x, random_t, target_score, std
def forward_train(self, data):
main_feat = self.get_main_feat(data)
''' get std '''
best_to_world_pose_9d_batch = data["best_to_world_pose_9d"]
perturbed_x, random_t, target_score, std = self.pertube_data(best_to_world_pose_9d_batch)
input_data = {
"sampled_pose": perturbed_x,
"t": random_t,
"main_feat": main_feat,
}
estimated_score = self.view_finder(input_data)
output = {
"estimated_score": estimated_score,
"target_score": target_score,
"std": std
}
return output
def forward_test(self,data):
main_feat = self.get_main_feat(data)
estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view(main_feat)
result = {
"pred_pose_9d": estimated_delta_rot_9d,
"in_process_sample": in_process_sample
}
return result
def get_main_feat(self, data):
scanned_n_to_world_pose_9d_batch = data['scanned_n_to_world_pose_9d']
scanned_target_pts_num_batch = data['scanned_target_points_num']
device = next(self.parameters()).device
embedding_list_batch = []
for scanned_n_to_world_pose_9d,scanned_target_pts_num in zip(scanned_n_to_world_pose_9d_batch,scanned_target_pts_num_batch):
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device)
scanned_target_pts_num = scanned_target_pts_num.to(device)
pose_feat_seq = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d)
pts_num_feat_seq = self.pts_num_encoder.encode_pts_num(scanned_target_pts_num)
embedding_list_batch.append(torch.cat([pose_feat_seq, pts_num_feat_seq], dim=-1))
main_feat = self.pose_n_num_seq_encoder.encode_sequence(embedding_list_batch)
combined_scanned_pts_batch = data['combined_scanned_pts']
global_scanned_feat = self.pts_encoder.encode_points(combined_scanned_pts_batch)
main_feat = torch.cat([main_feat, global_scanned_feat], dim=-1)
if torch.isnan(main_feat).any():
Log.error("nan in main_feat", True)
return main_feat

View File

@ -7,6 +7,7 @@ from PytorchBoot.utils.log_util import Log
import torch
import os
import sys
sys.path.append(r"/home/data/hofee/project/nbv_rec/nbv_reconstruction")
from utils.data_load import DataLoadUtil
@ -29,7 +30,6 @@ class NBVReconstructionDataset(BaseDataset):
self.cache = config.get("cache")
self.load_from_preprocess = config.get("load_from_preprocess", False)
if self.type == namespace.Mode.TEST:
self.model_dir = config["model_dir"]
self.filter_degree = config["filter_degree"]
@ -40,9 +40,7 @@ class NBVReconstructionDataset(BaseDataset):
expr_root = ConfigManager.get("runner", "experiment", "root_dir")
expr_name = ConfigManager.get("runner", "experiment", "name")
self.cache_dir = os.path.join(expr_root, expr_name, "cache")
#self.preprocess_cache()
# self.preprocess_cache()
def load_scene_name_list(self):
scene_name_list = []
@ -51,7 +49,7 @@ class NBVReconstructionDataset(BaseDataset):
scene_name = line.strip()
scene_name_list.append(scene_name)
return scene_name_list
def get_datalist(self):
datalist = []
for scene_name in self.scene_name_list:
@ -60,7 +58,9 @@ class NBVReconstructionDataset(BaseDataset):
max_coverage_rate_list = []
for seq_idx in range(seq_num):
label_path = DataLoadUtil.get_label_path(self.root_dir, scene_name, seq_idx)
label_path = DataLoadUtil.get_label_path(
self.root_dir, scene_name, seq_idx
)
label_data = DataLoadUtil.load_label(label_path)
max_coverage_rate = label_data["max_coverage_rate"]
if max_coverage_rate > scene_max_coverage_rate:
@ -69,20 +69,24 @@ class NBVReconstructionDataset(BaseDataset):
mean_coverage_rate = np.mean(max_coverage_rate_list)
for seq_idx in range(seq_num):
label_path = DataLoadUtil.get_label_path(self.root_dir, scene_name, seq_idx)
label_path = DataLoadUtil.get_label_path(
self.root_dir, scene_name, seq_idx
)
label_data = DataLoadUtil.load_label(label_path)
if max_coverage_rate_list[seq_idx] > mean_coverage_rate - 0.1:
for data_pair in label_data["data_pairs"]:
scanned_views = data_pair[0]
next_best_view = data_pair[1]
datalist.append({
"scanned_views": scanned_views,
"next_best_view": next_best_view,
"seq_max_coverage_rate": max_coverage_rate,
"scene_name": scene_name,
"label_idx": seq_idx,
"scene_max_coverage_rate": scene_max_coverage_rate
})
datalist.append(
{
"scanned_views": scanned_views,
"next_best_view": next_best_view,
"seq_max_coverage_rate": max_coverage_rate,
"scene_name": scene_name,
"label_idx": seq_idx,
"scene_max_coverage_rate": scene_max_coverage_rate,
}
)
return datalist
def preprocess_cache(self):
@ -90,7 +94,7 @@ class NBVReconstructionDataset(BaseDataset):
for item_idx in range(len(self.datalist)):
self.__getitem__(item_idx)
Log.success("finish preprocessing cache.")
def load_from_cache(self, scene_name, curr_frame_idx):
cache_name = f"{scene_name}_{curr_frame_idx}.txt"
cache_path = os.path.join(self.cache_dir, cache_name)
@ -99,7 +103,7 @@ class NBVReconstructionDataset(BaseDataset):
return data
else:
return None
def save_to_cache(self, scene_name, curr_frame_idx, data):
cache_name = f"{scene_name}_{curr_frame_idx}.txt"
cache_path = os.path.join(self.cache_dir, cache_name)
@ -107,125 +111,172 @@ class NBVReconstructionDataset(BaseDataset):
np.savetxt(cache_path, data)
except Exception as e:
Log.error(f"Save cache failed: {e}")
# ----- Debug Trace ----- #
import ipdb; ipdb.set_trace()
# ------------------------ #
def __getitem__(self, index):
data_item_info = self.datalist[index]
scanned_views = data_item_info["scanned_views"]
nbv = data_item_info["next_best_view"]
max_coverage_rate = data_item_info["seq_max_coverage_rate"]
scene_name = data_item_info["scene_name"]
scanned_views_pts, scanned_coverages_rate, scanned_n_to_world_pose = [], [], []
(
scanned_views_pts,
scanned_coverages_rate,
scanned_n_to_world_pose,
scanned_target_pts_num,
) = ([], [], [], [])
target_pts_num_dict = DataLoadUtil.load_target_pts_num_dict(
self.root_dir, scene_name
)
for view in scanned_views:
frame_idx = view[0]
coverage_rate = view[1]
view_path = DataLoadUtil.get_path(self.root_dir, scene_name, frame_idx)
cam_info = DataLoadUtil.load_cam_info(view_path, binocular=True)
target_pts_num = target_pts_num_dict[frame_idx]
n_to_world_pose = cam_info["cam_to_world"]
nR_to_world_pose = cam_info["cam_to_world_R"]
nR_to_world_pose = cam_info["cam_to_world_R"]
if self.load_from_preprocess:
downsampled_target_point_cloud = DataLoadUtil.load_from_preprocessed_pts(view_path)
downsampled_target_point_cloud = (
DataLoadUtil.load_from_preprocessed_pts(view_path)
)
else:
cached_data = None
if self.cache:
cached_data = self.load_from_cache(scene_name, frame_idx)
if cached_data is None:
print("load depth")
depth_L, depth_R = DataLoadUtil.load_depth(view_path, cam_info['near_plane'], cam_info['far_plane'], binocular=True)
point_cloud_L = DataLoadUtil.get_point_cloud(depth_L, cam_info['cam_intrinsic'], n_to_world_pose)['points_world']
point_cloud_R = DataLoadUtil.get_point_cloud(depth_R, cam_info['cam_intrinsic'], nR_to_world_pose)['points_world']
point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, 65536)
point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, 65536)
overlap_points = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R)
downsampled_target_point_cloud = PtsUtil.random_downsample_point_cloud(overlap_points, self.pts_num)
depth_L, depth_R = DataLoadUtil.load_depth(
view_path,
cam_info["near_plane"],
cam_info["far_plane"],
binocular=True,
)
point_cloud_L = DataLoadUtil.get_point_cloud(
depth_L, cam_info["cam_intrinsic"], n_to_world_pose
)["points_world"]
point_cloud_R = DataLoadUtil.get_point_cloud(
depth_R, cam_info["cam_intrinsic"], nR_to_world_pose
)["points_world"]
point_cloud_L = PtsUtil.random_downsample_point_cloud(
point_cloud_L, 65536
)
point_cloud_R = PtsUtil.random_downsample_point_cloud(
point_cloud_R, 65536
)
overlap_points = PtsUtil.get_overlapping_points(
point_cloud_L, point_cloud_R
)
downsampled_target_point_cloud = (
PtsUtil.random_downsample_point_cloud(
overlap_points, self.pts_num
)
)
if self.cache:
self.save_to_cache(scene_name, frame_idx, downsampled_target_point_cloud)
self.save_to_cache(
scene_name, frame_idx, downsampled_target_point_cloud
)
else:
downsampled_target_point_cloud = cached_data
scanned_views_pts.append(downsampled_target_point_cloud)
scanned_coverages_rate.append(coverage_rate)
n_to_world_6d = PoseUtil.matrix_to_rotation_6d_numpy(np.asarray(n_to_world_pose[:3,:3]))
n_to_world_trans = n_to_world_pose[:3,3]
scanned_coverages_rate.append(coverage_rate)
n_to_world_6d = PoseUtil.matrix_to_rotation_6d_numpy(
np.asarray(n_to_world_pose[:3, :3])
)
n_to_world_trans = n_to_world_pose[:3, 3]
n_to_world_9d = np.concatenate([n_to_world_6d, n_to_world_trans], axis=0)
scanned_n_to_world_pose.append(n_to_world_9d)
scanned_target_pts_num.append(target_pts_num)
nbv_idx, nbv_coverage_rate = nbv[0], nbv[1]
nbv_path = DataLoadUtil.get_path(self.root_dir, scene_name, nbv_idx)
cam_info = DataLoadUtil.load_cam_info(nbv_path)
best_frame_to_world = cam_info["cam_to_world"]
best_to_world_6d = PoseUtil.matrix_to_rotation_6d_numpy(np.asarray(best_frame_to_world[:3,:3]))
best_to_world_trans = best_frame_to_world[:3,3]
best_to_world_9d = np.concatenate([best_to_world_6d, best_to_world_trans], axis=0)
best_to_world_6d = PoseUtil.matrix_to_rotation_6d_numpy(
np.asarray(best_frame_to_world[:3, :3])
)
best_to_world_trans = best_frame_to_world[:3, 3]
best_to_world_9d = np.concatenate(
[best_to_world_6d, best_to_world_trans], axis=0
)
combined_scanned_views_pts = np.concatenate(scanned_views_pts, axis=0)
voxel_downsampled_combined_scanned_pts_np = PtsUtil.voxel_downsample_point_cloud(combined_scanned_views_pts, 0.002)
random_downsampled_combined_scanned_pts_np = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, self.pts_num)
voxel_downsampled_combined_scanned_pts_np = (
PtsUtil.voxel_downsample_point_cloud(combined_scanned_views_pts, 0.002)
)
random_downsampled_combined_scanned_pts_np = (
PtsUtil.random_downsample_point_cloud(
voxel_downsampled_combined_scanned_pts_np, self.pts_num
)
)
data_item = {
"scanned_pts": np.asarray(scanned_views_pts,dtype=np.float32),
"combined_scanned_pts": np.asarray(random_downsampled_combined_scanned_pts_np,dtype=np.float32),
"scanned_pts": np.asarray(scanned_views_pts, dtype=np.float32),
"combined_scanned_pts": np.asarray(
random_downsampled_combined_scanned_pts_np, dtype=np.float32
),
"scanned_coverage_rate": scanned_coverages_rate,
"scanned_n_to_world_pose_9d": np.asarray(scanned_n_to_world_pose,dtype=np.float32),
"scanned_n_to_world_pose_9d": np.asarray(
scanned_n_to_world_pose, dtype=np.float32
),
"best_coverage_rate": nbv_coverage_rate,
"best_to_world_pose_9d": np.asarray(best_to_world_9d,dtype=np.float32),
"best_to_world_pose_9d": np.asarray(best_to_world_9d, dtype=np.float32),
"seq_max_coverage_rate": max_coverage_rate,
"scene_name": scene_name
"scene_name": scene_name,
"scanned_target_points_num": np.asarray(
scanned_target_pts_num, dtype=np.int32
),
}
# if self.type == namespace.Mode.TEST:
# diag = DataLoadUtil.get_bbox_diag(self.model_dir, scene_name)
# voxel_threshold = diag*0.02
# model_points_normals = DataLoadUtil.load_points_normals(self.root_dir, scene_name)
# pts_list = []
# for view in scanned_views:
# frame_idx = view[0]
# view_path = DataLoadUtil.get_path(self.root_dir, scene_name, frame_idx)
# point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(view_path, binocular=True)
# cam_params = DataLoadUtil.load_cam_info(view_path, binocular=True)
# sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=self.filter_degree)
# pts_list.append(sampled_point_cloud)
# nL_to_world_pose = cam_params["cam_to_world"]
# nO_to_world_pose = cam_params["cam_to_world_O"]
# nO_to_nL_pose = np.dot(np.linalg.inv(nL_to_world_pose), nO_to_world_pose)
# data_item["scanned_target_pts_list"] = pts_list
# data_item["model_points_normals"] = model_points_normals
# data_item["voxel_threshold"] = voxel_threshold
# data_item["filter_degree"] = self.filter_degree
# data_item["scene_path"] = os.path.join(self.root_dir, scene_name)
# data_item["first_frame_to_world"] = np.asarray(first_frame_to_world, dtype=np.float32)
# data_item["nO_to_nL_pose"] = np.asarray(nO_to_nL_pose, dtype=np.float32)
return data_item
def __len__(self):
return len(self.datalist)
def get_collate_fn(self):
def collate_fn(batch):
collate_data = {}
collate_data["scanned_pts"] = [torch.tensor(item['scanned_pts']) for item in batch]
collate_data["scanned_n_to_world_pose_9d"] = [torch.tensor(item['scanned_n_to_world_pose_9d']) for item in batch]
collate_data["best_to_world_pose_9d"] = torch.stack([torch.tensor(item['best_to_world_pose_9d']) for item in batch])
collate_data["combined_scanned_pts"] = torch.stack([torch.tensor(item['combined_scanned_pts']) for item in batch])
collate_data["scanned_pts"] = [
torch.tensor(item["scanned_pts"]) for item in batch
]
collate_data["scanned_n_to_world_pose_9d"] = [
torch.tensor(item["scanned_n_to_world_pose_9d"]) for item in batch
]
collate_data["scanned_target_points_num"] = [
torch.tensor(item["scanned_target_points_num"]) for item in batch
]
collate_data["best_to_world_pose_9d"] = torch.stack(
[torch.tensor(item["best_to_world_pose_9d"]) for item in batch]
)
collate_data["combined_scanned_pts"] = torch.stack(
[torch.tensor(item["combined_scanned_pts"]) for item in batch]
)
if "first_frame_to_world" in batch[0]:
collate_data["first_frame_to_world"] = torch.stack([torch.tensor(item["first_frame_to_world"]) for item in batch])
collate_data["first_frame_to_world"] = torch.stack(
[torch.tensor(item["first_frame_to_world"]) for item in batch]
)
for key in batch[0].keys():
if key not in ["scanned_pts", "scanned_n_to_world_pose_9d", "best_to_world_pose_9d", "first_frame_to_world", "combined_scanned_pts"]:
if key not in [
"scanned_pts",
"scanned_n_to_world_pose_9d",
"best_to_world_pose_9d",
"first_frame_to_world",
"combined_scanned_pts",
"scanned_target_points_num",
]:
collate_data[key] = [item[key] for item in batch]
return collate_data
return collate_fn
# -------------- Debug ---------------- #
if __name__ == "__main__":
import torch
seed = 0
torch.manual_seed(seed)
np.random.seed(seed)
@ -244,41 +295,13 @@ if __name__ == "__main__":
}
ds = NBVReconstructionDataset(config)
print(len(ds))
#ds.__getitem__(10)
# ds.__getitem__(10)
dl = ds.get_loader(shuffle=True)
for idx, data in enumerate(dl):
data = ds.process_batch(data, "cuda:0")
print(data)
# ------ Debug Start ------
import ipdb;ipdb.set_trace()
import ipdb
ipdb.set_trace()
# ------ Debug End ------
#
# for idx, data in enumerate(dl):
# cnt=0
# print(data["scene_name"])
# print(data["scanned_coverage_rate"])
# print(data["best_coverage_rate"])
# for pts in data["scanned_pts"][0]:
# #np.savetxt(f"pts_{cnt}.txt", pts)
# cnt+=1
# #np.savetxt("best_pts.txt", best_pts)
# for key, value in data.items():
# if isinstance(value, torch.Tensor):
# print(key, ":" ,value.shape)
# else:
# print(key, ":" ,len(value))
# if key == "scanned_n_to_world_pose_9d":
# for val in value:
# print(val.shape)
# if key == "scanned_pts":
# print("scanned_pts")
# for val in value:
# print(val.shape)
# cnt = 0
# for v in val:
# import ipdb;ipdb.set_trace()
# np.savetxt(f"pts_{cnt}.txt", v)
# cnt+=1
# print()

View File

@ -89,7 +89,7 @@ class SeqNBVReconstructionDataset(BaseDataset):
first_point_cloud_L = PtsUtil.random_downsample_point_cloud(first_point_cloud_L, 65536)
first_point_cloud_R = PtsUtil.random_downsample_point_cloud(first_point_cloud_R, 65536)
first_overlap_points = DataLoadUtil.get_overlapping_points(first_point_cloud_L, first_point_cloud_R)
first_overlap_points = PtsUtil.get_overlapping_points(first_point_cloud_L, first_point_cloud_R)
first_downsampled_target_point_cloud = PtsUtil.random_downsample_point_cloud(first_overlap_points, self.pts_num)
first_to_world_rot_6d = PoseUtil.matrix_to_rotation_6d_numpy(np.asarray(first_left_cam_pose[:3,:3]))

View File

@ -22,12 +22,10 @@ class PointNetEncoder(nn.Module):
self.conv2 = torch.nn.Conv1d(64, 128, 1)
self.conv3 = torch.nn.Conv1d(128, 512, 1)
self.conv4 = torch.nn.Conv1d(512, self.out_dim , 1)
self.global_feat = config["global_feat"]
if self.feature_transform:
self.f_stn = STNkd(k=64)
def forward(self, x):
n_pts = x.shape[2]
trans = self.stn(x)
x = x.transpose(2, 1)
x = torch.bmm(x, trans)
@ -46,20 +44,15 @@ class PointNetEncoder(nn.Module):
x = self.conv4(x)
x = torch.max(x, 2, keepdim=True)[0]
x = x.view(-1, self.out_dim)
if self.global_feat:
return x
else:
x = x.view(-1, self.out_dim, 1).repeat(1, 1, n_pts)
return torch.cat([x, point_feat], 1)
return x, point_feat
def encode_points(self, pts):
def encode_points(self, pts, require_per_point_feat=False):
pts = pts.transpose(2, 1)
if not self.global_feat:
pts_feature = self(pts).transpose(2, 1)
global_pts_feature, per_point_feature = self(pts)
if require_per_point_feat:
return global_pts_feature, per_point_feature.transpose(2, 1)
else:
pts_feature = self(pts)
return pts_feature
return global_pts_feature
class STNkd(nn.Module):
def __init__(self, k=64):
@ -102,21 +95,13 @@ if __name__ == "__main__":
config = {
"in_dim": 3,
"out_dim": 1024,
"global_feat": True,
"feature_transform": False
}
pointnet_global = PointNetEncoder(config)
out = pointnet_global.encode_points(sim_data)
pointnet = PointNetEncoder(config)
out = pointnet.encode_points(sim_data)
print("global feat", out.size())
config = {
"in_dim": 3,
"out_dim": 1024,
"global_feat": False,
"feature_transform": False
}
pointnet = PointNetEncoder(config)
out = pointnet.encode_points(sim_data)
out, per_point_out = pointnet.encode_points(sim_data, require_per_point_feat=True)
print("point feat", out.size())
print("per point feat", per_point_out.size())

View File

@ -0,0 +1,20 @@
from torch import nn
import PytorchBoot.stereotype as stereotype
@stereotype.module("pts_num_encoder")
class PointsNumEncoder(nn.Module):
def __init__(self, config):
super(PointsNumEncoder, self).__init__()
self.config = config
out_dim = config["out_dim"]
self.act = nn.ReLU(True)
self.pts_num_encoder = nn.Sequential(
nn.Linear(1, out_dim),
self.act,
nn.Linear(out_dim, out_dim),
self.act,
)
def encode_pts_num(self, num_seq):
return self.pts_num_encoder(num_seq)

View File

@ -1,63 +0,0 @@
import torch
from torch import nn
from torch.nn.utils.rnn import pad_sequence
import PytorchBoot.stereotype as stereotype
@stereotype.module("transformer_pose_seq_encoder")
class TransformerPoseSequenceEncoder(nn.Module):
def __init__(self, config):
super(TransformerPoseSequenceEncoder, self).__init__()
self.config = config
embed_dim = config["pose_embed_dim"]
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim,
nhead=config["num_heads"],
dim_feedforward=config["ffn_dim"],
batch_first=True,
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer, num_layers=config["num_layers"]
)
self.fc = nn.Linear(embed_dim, config["output_dim"])
def encode_sequence(self, pose_embedding_list_batch):
lengths = []
for pose_embedding_list in pose_embedding_list_batch:
lengths.append(len(pose_embedding_list))
combined_tensor = pad_sequence(pose_embedding_list_batch, batch_first=True) # Shape: [batch_size, max_seq_len, embed_dim]
max_len = max(lengths)
padding_mask = torch.tensor([([0] * length + [1] * (max_len - length)) for length in lengths], dtype=torch.bool).to(combined_tensor.device)
transformer_output = self.transformer_encoder(combined_tensor, src_key_padding_mask=padding_mask)
final_feature = transformer_output.mean(dim=1)
final_output = self.fc(final_feature)
return final_output
if __name__ == "__main__":
config = {
"pose_embed_dim": 256,
"num_heads": 4,
"ffn_dim": 256,
"num_layers": 3,
"output_dim": 1024,
}
encoder = TransformerPoseSequenceEncoder(config)
seq_len = [5, 8, 9, 4]
batch_size = 4
pose_embedding_list_batch = [
torch.randn(seq_len[idx], config["pose_embed_dim"]) for idx in range(batch_size)
]
output_feature = encoder.encode_sequence(
pose_embedding_list_batch
)
print("Encoded Feature:", output_feature)
print("Feature Shape:", output_feature.shape)

View File

@ -9,7 +9,7 @@ class TransformerSequenceEncoder(nn.Module):
def __init__(self, config):
super(TransformerSequenceEncoder, self).__init__()
self.config = config
embed_dim = config["pts_embed_dim"] + config["pose_embed_dim"]
embed_dim = config["embed_dim"]
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim,
nhead=config["num_heads"],
@ -21,24 +21,19 @@ class TransformerSequenceEncoder(nn.Module):
)
self.fc = nn.Linear(embed_dim, config["output_dim"])
def encode_sequence(self, pts_embedding_list_batch, pose_embedding_list_batch):
combined_features_batch = []
def encode_sequence(self, embedding_list_batch):
lengths = []
for embedding_list in embedding_list_batch:
lengths.append(len(embedding_list))
for pts_embedding_list, pose_embedding_list in zip(pts_embedding_list_batch, pose_embedding_list_batch):
combined_features = [
torch.cat((pts_embed, pose_embed), dim=-1)
for pts_embed, pose_embed in zip(pts_embedding_list, pose_embedding_list)
]
combined_features_batch.append(torch.stack(combined_features))
lengths.append(len(combined_features))
combined_tensor = pad_sequence(combined_features_batch, batch_first=True) # Shape: [batch_size, max_seq_len, embed_dim]
embedding_tensor = pad_sequence(embedding_list_batch, batch_first=True) # Shape: [batch_size, max_seq_len, embed_dim]
max_len = max(lengths)
padding_mask = torch.tensor([([0] * length + [1] * (max_len - length)) for length in lengths], dtype=torch.bool).to(combined_tensor.device)
padding_mask = torch.tensor([([0] * length + [1] * (max_len - length)) for length in lengths], dtype=torch.bool).to(embedding_tensor.device)
transformer_output = self.transformer_encoder(combined_tensor, src_key_padding_mask=padding_mask)
transformer_output = self.transformer_encoder(embedding_tensor, src_key_padding_mask=padding_mask)
final_feature = transformer_output.mean(dim=1)
final_output = self.fc(final_feature)
@ -47,26 +42,22 @@ class TransformerSequenceEncoder(nn.Module):
if __name__ == "__main__":
config = {
"pts_embed_dim": 1024,
"pose_embed_dim": 256,
"embed_dim": 256,
"num_heads": 4,
"ffn_dim": 256,
"num_layers": 3,
"output_dim": 2048,
"output_dim": 1024,
}
encoder = TransformerSequenceEncoder(config)
seq_len = [5, 8, 9, 4]
batch_size = 4
pts_embedding_list_batch = [
torch.randn(seq_len[idx], config["pts_embed_dim"]) for idx in range(batch_size)
]
pose_embedding_list_batch = [
torch.randn(seq_len[idx], config["pose_embed_dim"]) for idx in range(batch_size)
embedding_list_batch = [
torch.randn(seq_len[idx], config["embed_dim"]) for idx in range(batch_size)
]
output_feature = encoder.encode_sequence(
pts_embedding_list_batch, pose_embedding_list_batch
embedding_list_batch
)
print("Encoded Feature:", output_feature)
print("Feature Shape:", output_feature.shape)

177
preprocess/preprocessor.py Normal file
View File

@ -0,0 +1,177 @@
import os
import json
import numpy as np
import sys
np.random.seed(0)
# append parent directory to sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
print(sys.path)
from utils.reconstruction import ReconstructionUtil
from utils.data_load import DataLoadUtil
from utils.pts import PtsUtil
def save_np_pts(path, pts: np.ndarray, file_type="txt"):
if file_type == "txt":
np.savetxt(path, pts)
else:
np.save(path, pts)
def save_full_points(root, scene, frame_idx, full_points: np.ndarray, file_type="txt"):
pts_path = os.path.join(root,scene, "scene_pts", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "scene_pts")):
os.makedirs(os.path.join(root,scene, "scene_pts"))
save_np_pts(pts_path, full_points, file_type)
def save_target_points(root, scene, frame_idx, target_points: np.ndarray, file_type="txt"):
pts_path = os.path.join(root,scene, "target_pts", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "target_pts")):
os.makedirs(os.path.join(root,scene, "target_pts"))
save_np_pts(pts_path, target_points, file_type)
def save_mask_idx(root, scene, frame_idx, mask_idx: np.ndarray,filtered_idx, file_type="txt"):
indices_path = os.path.join(root,scene, "mask_idx", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "mask_idx")):
os.makedirs(os.path.join(root,scene, "mask_idx"))
save_np_pts(indices_path, mask_idx, file_type)
filtered_path = os.path.join(root,scene, "mask_idx", f"{frame_idx}_filtered.{file_type}")
save_np_pts(filtered_path, filtered_idx, file_type)
def save_scan_points_indices(root, scene, frame_idx, scan_points_indices: np.ndarray, file_type="txt"):
indices_path = os.path.join(root,scene, "scan_points_indices", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "scan_points_indices")):
os.makedirs(os.path.join(root,scene, "scan_points_indices"))
save_np_pts(indices_path, scan_points_indices, file_type)
def save_scan_points(root, scene, scan_points: np.ndarray):
scan_points_path = os.path.join(root,scene, "scan_points.txt")
save_np_pts(scan_points_path, scan_points)
def get_world_points(depth, cam_intrinsic, cam_extrinsic):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
points_camera_aug = np.concatenate((points_camera, np.ones((points_camera.shape[0], 1))), axis=-1)
points_camera_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3]
return points_camera_world
def get_world_normals(normals, cam_extrinsic):
normals = normals / np.linalg.norm(normals, axis=1, keepdims=True)
normals_world = np.dot(cam_extrinsic[:3, :3], normals.T).T
return normals_world
def get_scan_points_indices(scan_points, mask, display_table_mask_label, cam_intrinsic, cam_extrinsic):
scan_points_homogeneous = np.hstack((scan_points, np.ones((scan_points.shape[0], 1))))
points_camera = np.dot(cam_extrinsic, scan_points_homogeneous.T).T[:, :3]
points_image_homogeneous = np.dot(cam_intrinsic, points_camera.T).T
points_image_homogeneous /= points_image_homogeneous[:, 2:]
pixel_x = points_image_homogeneous[:, 0].astype(int)
pixel_y = points_image_homogeneous[:, 1].astype(int)
h, w = mask.shape[:2]
valid_indices = (pixel_x >= 0) & (pixel_x < w) & (pixel_y >= 0) & (pixel_y < h)
mask_colors = mask[pixel_y[valid_indices], pixel_x[valid_indices]]
selected_points_indices = mask_colors == display_table_mask_label
return selected_points_indices
def save_scene_data(root, scene, scene_idx=0, scene_total=1):
''' configuration '''
target_mask_label = (0, 255, 0, 255)
display_table_mask_label=(0, 0, 255, 255)
random_downsample_N = 65536
train_input_pts_num = 8192
voxel_size=0.002
filter_degree = 75
''' scan points '''
display_table_info = DataLoadUtil.get_display_table_info(root, scene)
radius = display_table_info["radius"]
scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius))
''' read frame data(depth|mask|normal) '''
frame_num = DataLoadUtil.get_scene_seq_length(root, scene)
for frame_id in range(frame_num):
print(f"[scene({scene_idx}/{scene_total})|frame({frame_id}/{frame_num})]Processing {scene} frame {frame_id}")
path = DataLoadUtil.get_path(root, scene, frame_id)
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
depth_L, depth_R = DataLoadUtil.load_depth(
path, cam_info["near_plane"],
cam_info["far_plane"],
binocular=True
)
mask_L = DataLoadUtil.load_seg(path, binocular=True, left_only=True)
normal_L = DataLoadUtil.load_normal(path, binocular=True, left_only=True)
''' scene points '''
scene_points_L = get_world_points(depth_L, cam_info["cam_intrinsic"], cam_info["cam_to_world"])
scene_points_R = get_world_points(depth_R, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"])
sampled_scene_points_L, random_sample_idx_L = PtsUtil.random_downsample_point_cloud(
scene_points_L, random_downsample_N, require_idx=True
)
sampled_scene_points_R = PtsUtil.random_downsample_point_cloud(
scene_points_R, random_downsample_N
)
scene_overlap_points, overlap_idx_L = PtsUtil.get_overlapping_points(
sampled_scene_points_L, sampled_scene_points_R, voxel_size, require_idx=True
)
if scene_overlap_points.shape[0] < 1024:
scene_overlap_points = sampled_scene_points_L
overlap_idx_L = np.arange(sampled_scene_points_L.shape[0])
train_input_points, train_input_idx = PtsUtil.random_downsample_point_cloud(
scene_overlap_points, train_input_pts_num, require_idx=True
)
''' target points '''
mask_img = mask_L
mask_L = mask_L.reshape(-1, 4)
mask_L = (mask_L == target_mask_label).all(axis=-1)
mask_overlap = mask_L[random_sample_idx_L][overlap_idx_L]
scene_normals_L = normal_L.reshape(-1, 3)
target_overlap_normals = scene_normals_L[random_sample_idx_L][overlap_idx_L][mask_overlap]
target_normals = get_world_normals(target_overlap_normals, cam_info["cam_to_world"])
target_points = scene_overlap_points[mask_overlap]
filtered_target_points, filtered_idx = PtsUtil.filter_points(
target_points, target_normals, cam_info["cam_to_world"], filter_degree, require_idx=True
)
''' train_input_mask '''
mask_train_input = mask_overlap[train_input_idx]
''' scan points indices '''
scan_points_indices = get_scan_points_indices(scan_points, mask_img, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world"])
save_full_points(root, scene, frame_id, train_input_points)
save_target_points(root, scene, frame_id, filtered_target_points)
save_mask_idx(root, scene, frame_id, mask_train_input, filtered_idx=filtered_idx)
save_scan_points_indices(root, scene, frame_id, scan_points_indices)
save_scan_points(root, scene, scan_points) # The "done" flag of scene preprocess
if __name__ == "__main__":
#root = "/media/hofee/repository/new_data_with_normal"
root = "/media/hofee/repository/test_sample"
list_path = "/media/hofee/repository/test_sample/test_sample_list.txt"
scene_list = []
with open(list_path, "r") as f:
for line in f:
scene_list.append(line.strip())
from_idx = 0
to_idx = len(scene_list)
cnt = 0
total = to_idx - from_idx
for scene in scene_list[from_idx:to_idx]:
save_scene_data(root, scene, cnt, total)
cnt+=1

View File

@ -35,30 +35,34 @@ class StrategyGenerator(Runner):
def run(self):
dataset_name_list = ConfigManager.get("runner", "generate", "dataset_list")
voxel_threshold, overlap_threshold = ConfigManager.get("runner","generate","voxel_threshold"), ConfigManager.get("runner","generate","overlap_threshold")
voxel_threshold, soft_overlap_threshold, hard_overlap_threshold = ConfigManager.get("runner","generate","voxel_threshold"), ConfigManager.get("runner","generate","soft_overlap_threshold"), ConfigManager.get("runner","generate","hard_overlap_threshold")
for dataset_idx in range(len(dataset_name_list)):
dataset_name = dataset_name_list[dataset_idx]
status_manager.set_progress("generate_strategy", "strategy_generator", "dataset", dataset_idx, len(dataset_name_list))
root_dir = ConfigManager.get("datasets", dataset_name, "root_dir")
model_dir = ConfigManager.get("datasets", dataset_name, "model_dir")
from_idx = ConfigManager.get("datasets",dataset_name,"from")
to_idx = ConfigManager.get("datasets",dataset_name,"to")
scene_name_list = os.listdir(root_dir)
if to_idx == -1:
to_idx = len(scene_name_list)
cnt = 0
total = len(scene_name_list)
for scene_name in scene_name_list:
total = len(scene_name_list[from_idx:to_idx])
Log.info(f"Processing Dataset: {dataset_name}, From: {from_idx}, To: {to_idx}")
for scene_name in scene_name_list[from_idx:to_idx]:
Log.info(f"({dataset_name})Processing [{cnt}/{total}]: {scene_name}")
status_manager.set_progress("generate_strategy", "strategy_generator", "scene", cnt, total)
diag = DataLoadUtil.get_bbox_diag(model_dir, scene_name)
voxel_threshold = diag*0.02
status_manager.set_status("generate_strategy", "strategy_generator", "voxel_threshold", voxel_threshold)
status_manager.set_status("generate_strategy", "strategy_generator", "diagonal", diag)
output_label_path = DataLoadUtil.get_label_path(root_dir, scene_name,0)
if os.path.exists(output_label_path) and not self.overwrite:
Log.info(f"Scene <{scene_name}> Already Exists, Skip")
cnt += 1
continue
try:
self.generate_sequence(root_dir, model_dir, scene_name,voxel_threshold, overlap_threshold)
except Exception as e:
Log.error(f"Scene <{scene_name}> Failed, Error: {e}")
self.generate_sequence(root_dir, model_dir, scene_name,voxel_threshold, soft_overlap_threshold, hard_overlap_threshold)
# except Exception as e:
# Log.error(f"Scene <{scene_name}> Failed, Error: {e}")
cnt += 1
status_manager.set_progress("generate_strategy", "strategy_generator", "scene", total, total)
status_manager.set_progress("generate_strategy", "strategy_generator", "dataset", len(dataset_name_list), len(dataset_name_list))
@ -71,43 +75,36 @@ class StrategyGenerator(Runner):
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def generate_sequence(self, root, model_dir, scene_name, voxel_threshold, overlap_threshold):
def generate_sequence(self, root, model_dir, scene_name, voxel_threshold, soft_overlap_threshold, hard_overlap_threshold):
status_manager.set_status("generate_strategy", "strategy_generator", "scene", scene_name)
frame_num = DataLoadUtil.get_scene_seq_length(root, scene_name)
model_points_normals = DataLoadUtil.load_points_normals(root, scene_name)
model_pts = model_points_normals[:,:3]
down_sampled_model_pts = PtsUtil.voxel_downsample_point_cloud(model_pts, voxel_threshold)
pts_list = []
scan_points_indices_list = []
non_zero_cnt = 0
for frame_idx in range(frame_num):
if self.load_pts and os.path.exists(os.path.join(root,scene_name, "pts", f"{frame_idx}.txt")):
sampled_point_cloud = np.loadtxt(os.path.join(root,scene_name, "pts", f"{frame_idx}.txt"))
status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_idx, frame_num)
pts_list.append(sampled_point_cloud)
continue
else:
path = DataLoadUtil.get_path(root, scene_name, frame_idx)
cam_params = DataLoadUtil.load_cam_info(path, binocular=True)
status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_idx, frame_num)
point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True)
sampled_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=self.filter_degree)
if self.save_pts:
pts_dir = os.path.join(root,scene_name, "pts")
if not os.path.exists(pts_dir):
os.makedirs(pts_dir)
np.savetxt(os.path.join(pts_dir, f"{frame_idx}.txt"), sampled_point_cloud)
pts_list.append(sampled_point_cloud)
status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_idx, frame_num)
pts_path = os.path.join(root,scene_name, "target_pts", f"{frame_idx}.txt")
sampled_point_cloud = np.loadtxt(pts_path)
indices = None # ReconstructionUtil.compute_covered_scan_points(scan_points, display_table_pts)
pts_list.append(sampled_point_cloud)
scan_points_indices_list.append(indices)
status_manager.set_progress("generate_strategy", "strategy_generator", "loading frame", frame_num, frame_num)
seq_num = min(self.seq_num, len(pts_list))
init_view_list = range(seq_num)
seq_num = min(self.seq_num, non_zero_cnt)
init_view_list = []
for i in range(seq_num):
if pts_list[i].shape[0] < 100:
continue
init_view_list.append(i)
seq_idx = 0
for init_view in init_view_list:
status_manager.set_progress("generate_strategy", "strategy_generator", "computing sequence", seq_idx, len(init_view_list))
limited_useful_view, _, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(down_sampled_model_pts, pts_list,init_view=init_view, threshold=voxel_threshold, overlap_threshold=overlap_threshold, status_info=self.status_info)
limited_useful_view, _, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(down_sampled_model_pts, pts_list, scan_points_indices_list = scan_points_indices_list,init_view=init_view,
threshold=voxel_threshold, soft_overlap_threshold=soft_overlap_threshold, hard_overlap_threshold= hard_overlap_threshold, scan_points_threshold=10, status_info=self.status_info)
data_pairs = self.generate_data_pairs(limited_useful_view)
seq_save_data = {
"data_pairs": data_pairs,

View File

@ -6,56 +6,61 @@ import trimesh
import torch
from utils.pts import PtsUtil
class DataLoadUtil:
TABLE_POSITION = np.asarray([0,0,0.8215])
TABLE_POSITION = np.asarray([0, 0, 0.8215])
@staticmethod
def get_display_table_info(root, scene_name):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
display_table_info = scene_info["display_table"]
return display_table_info
@staticmethod
def get_display_table_top(root, scene_name):
display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)["height"]
display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray([0,0,display_table_height])
display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)[
"height"
]
display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray(
[0, 0, display_table_height]
)
return display_table_top
@staticmethod
def get_path(root, scene_name, frame_idx):
path = os.path.join(root, scene_name, f"{frame_idx}")
return path
@staticmethod
def get_label_num(root, scene_name):
label_dir = os.path.join(root,scene_name,"label")
label_dir = os.path.join(root, scene_name, "label")
return len(os.listdir(label_dir))
@staticmethod
def get_label_path(root, scene_name, seq_idx):
label_dir = os.path.join(root,scene_name,"label")
label_dir = os.path.join(root, scene_name, "label")
if not os.path.exists(label_dir):
os.makedirs(label_dir)
path = os.path.join(label_dir,f"{seq_idx}.json")
path = os.path.join(label_dir, f"{seq_idx}.json")
return path
@staticmethod
def get_label_path_old(root, scene_name):
path = os.path.join(root,scene_name,"label.json")
path = os.path.join(root, scene_name, "label.json")
return path
@staticmethod
def get_scene_seq_length(root, scene_name):
camera_params_path = os.path.join(root, scene_name, "camera_params")
return len(os.listdir(camera_params_path))
@staticmethod
def load_mesh_at(model_dir, object_name, world_object_pose):
model_path = os.path.join(model_dir, object_name, "mesh.obj")
mesh = trimesh.load(model_path)
mesh.apply_transform(world_object_pose)
return mesh
@staticmethod
def get_bbox_diag(model_dir, object_name):
model_path = os.path.join(model_dir, object_name, "mesh.obj")
@ -63,8 +68,7 @@ class DataLoadUtil:
bbox = mesh.bounding_box.extents
diagonal_length = np.linalg.norm(bbox)
return diagonal_length
@staticmethod
def save_mesh_at(model_dir, output_dir, object_name, scene_name, world_object_pose):
mesh = DataLoadUtil.load_mesh_at(model_dir, object_name, world_object_pose)
@ -72,12 +76,16 @@ class DataLoadUtil:
mesh.export(model_path)
@staticmethod
def save_target_mesh_at_world_space(root, model_dir, scene_name, display_table_as_world_space_origin=True):
def save_target_mesh_at_world_space(
root, model_dir, scene_name, display_table_as_world_space_origin=True
):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
target_name = scene_info["target_name"]
transformation = scene_info[target_name]
if display_table_as_world_space_origin:
location = transformation["location"] - DataLoadUtil.get_display_table_top(root, scene_name)
location = transformation["location"] - DataLoadUtil.get_display_table_top(
root, scene_name
)
else:
location = transformation["location"]
rotation_euler = transformation["rotation_euler"]
@ -90,14 +98,21 @@ class DataLoadUtil:
os.makedirs(mesh_dir)
model_path = os.path.join(mesh_dir, "world_target_mesh.obj")
mesh.export(model_path)
@staticmethod
def load_scene_info(root, scene_name):
scene_info_path = os.path.join(root, scene_name, "scene_info.json")
with open(scene_info_path, "r") as f:
scene_info = json.load(f)
return scene_info
@staticmethod
def load_target_pts_num_dict(root, scene_name):
target_pts_num_path = os.path.join(root, scene_name, "target_pts_num.json")
with open(target_pts_num_path, "r") as f:
target_pts_num_dict = json.load(f)
return target_pts_num_dict
@staticmethod
def load_target_object_pose(root, scene_name):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
@ -108,10 +123,10 @@ class DataLoadUtil:
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
pose_mat[:3, 3] = location
return pose_mat
@staticmethod
def load_depth(path, min_depth=0.01,max_depth=5.0,binocular=False):
def load_depth(path, min_depth=0.01, max_depth=5.0, binocular=False):
def load_depth_from_real_path(real_path, min_depth, max_depth):
depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED)
depth = depth.astype(np.float32) / 65535.0
@ -119,78 +134,133 @@ class DataLoadUtil:
max_depth = max_depth
depth_meters = min_depth + (max_depth - min_depth) * depth
return depth_meters
if binocular:
depth_path_L = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_L.png")
depth_path_R = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + "_R.png")
depth_meters_L = load_depth_from_real_path(depth_path_L, min_depth, max_depth)
depth_meters_R = load_depth_from_real_path(depth_path_R, min_depth, max_depth)
depth_path_L = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + "_L.png"
)
depth_path_R = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + "_R.png"
)
depth_meters_L = load_depth_from_real_path(
depth_path_L, min_depth, max_depth
)
depth_meters_R = load_depth_from_real_path(
depth_path_R, min_depth, max_depth
)
return depth_meters_L, depth_meters_R
else:
depth_path = os.path.join(os.path.dirname(path), "depth", os.path.basename(path) + ".png")
depth_path = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + ".png"
)
depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth)
return depth_meters
@staticmethod
def load_seg(path, binocular=False):
if binocular:
def load_seg(path, binocular=False, left_only=False):
if binocular and not left_only:
def clean_mask(mask_image):
green = [0, 255, 0, 255]
red = [255, 0, 0, 255]
threshold = 2
mask_image = np.where(np.abs(mask_image - green) <= threshold, green, mask_image)
mask_image = np.where(np.abs(mask_image - red) <= threshold, red, mask_image)
mask_image = np.where(
np.abs(mask_image - green) <= threshold, green, mask_image
)
mask_image = np.where(
np.abs(mask_image - red) <= threshold, red, mask_image
)
return mask_image
mask_path_L = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_L.png")
mask_path_L = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
)
mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED))
mask_path_R = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + "_R.png")
mask_path_R = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_R.png"
)
mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED))
return mask_image_L, mask_image_R
else:
mask_path = os.path.join(os.path.dirname(path), "mask", os.path.basename(path) + ".png")
mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
if binocular and left_only:
mask_path = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
)
else:
mask_path = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + ".png"
)
mask_image = cv2.imread(mask_path, cv2.IMREAD_UNCHANGED)
return mask_image
@staticmethod
def load_normal(path, binocular=False, left_only=False):
if binocular and not left_only:
normal_path_L = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
normal_image_L = cv2.imread(normal_path_L, cv2.IMREAD_COLOR)
normal_path_R = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_R.png"
)
normal_image_R = cv2.imread(normal_path_R, cv2.IMREAD_COLOR)
return normal_image_L[:3,:3], normal_image_R[:3,:3]
else:
if binocular and left_only:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
else:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + ".png"
)
normal_image = cv2.imread(normal_path, cv2.IMREAD_COLOR)
return normal_image
@staticmethod
def load_label(path):
with open(path, 'r') as f:
with open(path, "r") as f:
label_data = json.load(f)
return label_data
@staticmethod
def load_rgb(path):
rgb_path = os.path.join(os.path.dirname(path), "rgb", os.path.basename(path) + ".png")
rgb_path = os.path.join(
os.path.dirname(path), "rgb", os.path.basename(path) + ".png"
)
rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR)
return rgb_image
@staticmethod
def load_from_preprocessed_pts(path):
npy_path = os.path.join(os.path.dirname(path), "points", os.path.basename(path) + ".npy")
npy_path = os.path.join(
os.path.dirname(path), "points", os.path.basename(path) + ".npy"
)
pts = np.load(npy_path)
return pts
@staticmethod
def cam_pose_transformation(cam_pose_before):
offset = np.asarray([
[1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, -1, 0],
[0, 0, 0, 1]])
cam_pose_after = cam_pose_before @ offset
offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
cam_pose_after = cam_pose_before @ offset
return cam_pose_after
@staticmethod
def load_cam_info(path, binocular=False, display_table_as_world_space_origin=True):
scene_dir = os.path.dirname(path)
root_dir = os.path.dirname(scene_dir)
scene_name = os.path.basename(scene_dir)
camera_params_path = os.path.join(os.path.dirname(path), "camera_params", os.path.basename(path) + ".json")
with open(camera_params_path, 'r') as f:
camera_params_path = os.path.join(
os.path.dirname(path), "camera_params", os.path.basename(path) + ".json"
)
with open(camera_params_path, "r") as f:
label_data = json.load(f)
cam_to_world = np.asarray(label_data["extrinsic"])
cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world)
world_to_display_table = np.eye(4)
world_to_display_table[:3, 3] = - DataLoadUtil.get_display_table_top(root_dir, scene_name)
world_to_display_table[:3, 3] = -DataLoadUtil.get_display_table_top(
root_dir, scene_name
)
if display_table_as_world_space_origin:
cam_to_world = np.dot(world_to_display_table, cam_to_world)
cam_intrinsic = np.asarray(label_data["intrinsic"])
@ -198,7 +268,7 @@ class DataLoadUtil:
"cam_to_world": cam_to_world,
"cam_intrinsic": cam_intrinsic,
"far_plane": label_data["far_plane"],
"near_plane": label_data["near_plane"]
"near_plane": label_data["near_plane"],
}
if binocular:
cam_to_world_R = np.asarray(label_data["extrinsic_R"])
@ -211,104 +281,127 @@ class DataLoadUtil:
cam_info["cam_to_world_O"] = cam_to_world_O
cam_info["cam_to_world_R"] = cam_to_world_R
return cam_info
@staticmethod
def get_real_cam_O_from_cam_L(cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True):
def get_real_cam_O_from_cam_L(
cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True
):
root_dir = os.path.dirname(scene_path)
scene_name = os.path.basename(scene_path)
if isinstance(cam_L, torch.Tensor):
cam_L = cam_L.cpu().numpy()
nO_to_display_table_pose = cam_L @ cam_O_to_cam_L
nO_to_display_table_pose = cam_L @ cam_O_to_cam_L
if display_table_as_world_space_origin:
display_table_to_world = np.eye(4)
display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top(root_dir, scene_name)
display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top(
root_dir, scene_name
)
nO_to_world_pose = np.dot(display_table_to_world, nO_to_display_table_pose)
nO_to_world_pose = DataLoadUtil.cam_pose_transformation(nO_to_world_pose)
return nO_to_world_pose
@staticmethod
def get_target_point_cloud(depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0,255,0,255)):
def get_target_point_cloud(
depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0, 255, 0, 255), require_full_points=False
):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy')
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1,4)
target_mask = (mask == target_mask_label).all(axis=-1)
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1, 4)
target_mask = (mask == target_mask_label).all(axis=-1)
target_points_camera = points_camera[target_mask]
target_points_camera_aug = np.concatenate([target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1)
target_points_camera_aug = np.concatenate(
[target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1
)
target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3]
return {
data = {
"points_world": target_points_world,
"points_camera": target_points_camera
"points_camera": target_points_camera,
}
return data
@staticmethod
def get_point_cloud(depth, cam_intrinsic, cam_extrinsic):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing='xy')
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
points_camera_aug = np.concatenate([points_camera, np.ones((points_camera.shape[0], 1))], axis=-1)
points_camera_aug = np.concatenate(
[points_camera, np.ones((points_camera.shape[0], 1))], axis=-1
)
points_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3]
return {
"points_world": points_world,
"points_camera": points_camera
}
return {"points_world": points_world, "points_camera": points_camera}
@staticmethod
def get_target_point_cloud_world_from_path(path, binocular=False, random_downsample_N=65536, voxel_size = 0.005, target_mask_label=(0,255,0,255)):
def get_target_point_cloud_world_from_path(
path,
binocular=False,
random_downsample_N=65536,
voxel_size=0.005,
target_mask_label=(0, 255, 0, 255),
display_table_mask_label=(0, 0, 255, 255),
get_display_table_pts=False,
require_normal=False,
):
cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular)
if binocular:
depth_L, depth_R = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'], binocular=True)
depth_L, depth_R = DataLoadUtil.load_depth(
path, cam_info["near_plane"], cam_info["far_plane"], binocular=True
)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
point_cloud_L = DataLoadUtil.get_target_point_cloud(depth_L, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask_L, target_mask_label)['points_world']
point_cloud_R = DataLoadUtil.get_target_point_cloud(depth_R, cam_info['cam_intrinsic'], cam_info['cam_to_world_R'], mask_R, target_mask_label)['points_world']
point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, random_downsample_N)
point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, random_downsample_N)
overlap_points = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size)
point_cloud_L = DataLoadUtil.get_target_point_cloud(
depth_L,
cam_info["cam_intrinsic"],
cam_info["cam_to_world"],
mask_L,
target_mask_label,
)["points_world"]
point_cloud_R = DataLoadUtil.get_target_point_cloud(
depth_R,
cam_info["cam_intrinsic"],
cam_info["cam_to_world_R"],
mask_R,
target_mask_label,
)["points_world"]
point_cloud_L = PtsUtil.random_downsample_point_cloud(
point_cloud_L, random_downsample_N
)
point_cloud_R = PtsUtil.random_downsample_point_cloud(
point_cloud_R, random_downsample_N
)
overlap_points = PtsUtil.get_overlapping_points(
point_cloud_L, point_cloud_R, voxel_size
)
return overlap_points
else:
depth = DataLoadUtil.load_depth(path, cam_info['near_plane'], cam_info['far_plane'])
depth = DataLoadUtil.load_depth(
path, cam_info["near_plane"], cam_info["far_plane"]
)
mask = DataLoadUtil.load_seg(path)
point_cloud = DataLoadUtil.get_target_point_cloud(depth, cam_info['cam_intrinsic'], cam_info['cam_to_world'], mask)['points_world']
point_cloud = DataLoadUtil.get_target_point_cloud(
depth, cam_info["cam_intrinsic"], cam_info["cam_to_world"], mask
)["points_world"]
return point_cloud
@staticmethod
def voxelize_points(points, voxel_size):
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
return unique_voxels
@staticmethod
def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005):
voxels_L, indices_L = DataLoadUtil.voxelize_points(point_cloud_L, voxel_size)
voxels_R, _ = DataLoadUtil.voxelize_points(point_cloud_R, voxel_size)
voxel_indices_L = voxels_L.view([('', voxels_L.dtype)]*3)
voxel_indices_R = voxels_R.view([('', voxels_R.dtype)]*3)
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
mask_L = np.isin(indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0])
overlapping_points = point_cloud_L[mask_L]
return overlapping_points
@staticmethod
def load_points_normals(root, scene_name, display_table_as_world_space_origin=True):
points_path = os.path.join(root, scene_name, "points_and_normals.txt")
points_normals = np.loadtxt(points_path)
if display_table_as_world_space_origin:
points_normals[:,:3] = points_normals[:,:3] - DataLoadUtil.get_display_table_top(root, scene_name)
return points_normals
points_normals[:, :3] = points_normals[
:, :3
] - DataLoadUtil.get_display_table_top(root, scene_name)
return points_normals

View File

@ -18,11 +18,55 @@ class PtsUtil:
return points_h[:, :3]
@staticmethod
def random_downsample_point_cloud(point_cloud, num_points):
def random_downsample_point_cloud(point_cloud, num_points, require_idx=False):
if point_cloud.shape[0] == 0:
if require_idx:
return point_cloud, np.array([])
return point_cloud
idx = np.random.choice(len(point_cloud), num_points, replace=True)
if require_idx:
return point_cloud[idx], idx
return point_cloud[idx]
@staticmethod
def random_downsample_point_cloud_tensor(point_cloud, num_points):
idx = torch.randint(0, len(point_cloud), (num_points,))
return point_cloud[idx]
return point_cloud[idx]
@staticmethod
def voxelize_points(points, voxel_size):
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
return unique_voxels
@staticmethod
def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005, require_idx=False):
voxels_L, indices_L = PtsUtil.voxelize_points(point_cloud_L, voxel_size)
voxels_R, _ = PtsUtil.voxelize_points(point_cloud_R, voxel_size)
voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3)
voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3)
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
mask_L = np.isin(
indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]
)
overlapping_points = point_cloud_L[mask_L]
if require_idx:
return overlapping_points, mask_L
return overlapping_points
@staticmethod
def filter_points(points, normals, cam_pose, theta=75, require_idx=False):
camera_axis = -cam_pose[:3, 2]
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
cos_theta = np.dot(normals_normalized, camera_axis)
theta_rad = np.deg2rad(theta)
idx = cos_theta > np.cos(theta_rad)
print(cos_theta, theta_rad)
filtered_points= points[idx]
# ------ Debug Start ------
import ipdb;ipdb.set_trace()
# ------ Debug End ------
if require_idx:
return filtered_points, idx
return filtered_points

View File

@ -8,7 +8,7 @@ class ReconstructionUtil:
def compute_coverage_rate(target_point_cloud, combined_point_cloud, threshold=0.01):
kdtree = cKDTree(combined_point_cloud)
distances, _ = kdtree.query(target_point_cloud)
covered_points = np.sum(distances < threshold)
covered_points = np.sum(distances < threshold*2)
coverage_rate = covered_points / target_point_cloud.shape[0]
return coverage_rate
@ -17,7 +17,10 @@ class ReconstructionUtil:
kdtree = cKDTree(combined_point_cloud)
distances, _ = kdtree.query(new_point_cloud)
overlapping_points = np.sum(distances < threshold)
overlap_rate = overlapping_points / new_point_cloud.shape[0]
if new_point_cloud.shape[0] == 0:
overlap_rate = 0
else:
overlap_rate = overlapping_points / new_point_cloud.shape[0]
return overlap_rate
@staticmethod
@ -43,11 +46,23 @@ class ReconstructionUtil:
best_view = view_index
return best_view, best_coverage_increase
@staticmethod
def get_new_added_points(old_combined_pts, new_pts, threshold=0.005):
if old_combined_pts.size == 0:
return new_pts
if new_pts.size == 0:
return np.array([])
tree = cKDTree(old_combined_pts)
distances, _ = tree.query(new_pts, k=1)
new_added_points = new_pts[distances > threshold]
return new_added_points
@staticmethod
def compute_next_best_view_sequence_with_overlap(target_point_cloud, point_cloud_list,threshold=0.01, overlap_threshold=0.3, init_view = 0, status_info=None):
def compute_next_best_view_sequence_with_overlap(target_point_cloud, point_cloud_list, scan_points_indices_list, threshold=0.01, soft_overlap_threshold=0.5, hard_overlap_threshold=0.7, init_view = 0, scan_points_threshold=5, status_info=None):
selected_views = [point_cloud_list[init_view]]
combined_point_cloud = np.vstack(selected_views)
history_indices = [scan_points_indices_list[init_view]]
down_sampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_point_cloud,threshold)
new_coverage = ReconstructionUtil.compute_coverage_rate(target_point_cloud, down_sampled_combined_point_cloud, threshold)
current_coverage = new_coverage
@ -61,8 +76,17 @@ class ReconstructionUtil:
best_coverage_increase = -1
for view_index in remaining_views:
if point_cloud_list[view_index].shape[0] == 0:
continue
if selected_views:
new_scan_points_indices = scan_points_indices_list[view_index]
if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold):
overlap_threshold = hard_overlap_threshold
else:
overlap_threshold = soft_overlap_threshold
combined_old_point_cloud = np.vstack(selected_views)
down_sampled_old_point_cloud = PtsUtil.voxel_downsample_point_cloud(combined_old_point_cloud,threshold)
down_sampled_new_view_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud_list[view_index],threshold)
@ -85,6 +109,7 @@ class ReconstructionUtil:
break
selected_views.append(point_cloud_list[best_view])
remaining_views.remove(best_view)
history_indices.append(scan_points_indices_list[best_view])
current_coverage += best_coverage_increase
cnt_processed_view += 1
if status_info is not None:
@ -104,20 +129,41 @@ class ReconstructionUtil:
runner_name = status_info["runner_name"]
sm.set_progress(app_name, runner_name, "processed view", len(point_cloud_list), len(point_cloud_list))
return view_sequence, remaining_views, down_sampled_combined_point_cloud
@staticmethod
def filter_points(points, points_normals, cam_pose, voxel_size=0.005, theta=45):
sampled_points = PtsUtil.voxel_downsample_point_cloud(points, voxel_size)
kdtree = cKDTree(points_normals[:,:3])
_, indices = kdtree.query(sampled_points)
nearest_points = points_normals[indices]
normals = nearest_points[:, 3:]
camera_axis = -cam_pose[:3, 2]
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
cos_theta = np.dot(normals_normalized, camera_axis)
theta_rad = np.deg2rad(theta)
filtered_sampled_points= sampled_points[cos_theta > np.cos(theta_rad)]
def generate_scan_points(display_table_top, display_table_radius, min_distance=0.03, max_points_num = 100, max_attempts = 1000):
points = []
attempts = 0
while len(points) < max_points_num and attempts < max_attempts:
angle = np.random.uniform(0, 2 * np.pi)
r = np.random.uniform(0, display_table_radius)
x = r * np.cos(angle)
y = r * np.sin(angle)
z = display_table_top
new_point = (x, y, z)
if all(np.linalg.norm(np.array(new_point) - np.array(existing_point)) >= min_distance for existing_point in points):
points.append(new_point)
attempts += 1
return points
@staticmethod
def compute_covered_scan_points(scan_points, point_cloud, threshold=0.01):
return filtered_sampled_points[:, :3]
tree = cKDTree(point_cloud)
covered_points = []
indices = []
for i, scan_point in enumerate(scan_points):
if tree.query_ball_point(scan_point, threshold):
covered_points.append(scan_point)
indices.append(i)
return covered_points, indices
@staticmethod
def check_scan_points_overlap(history_indices, indices2, threshold=5):
for indices1 in history_indices:
if len(set(indices1).intersection(set(indices2))) >= threshold:
return True
return False

View File

@ -33,12 +33,11 @@ class RenderUtil:
print(result.stderr)
return None
path = os.path.join(temp_dir, "tmp")
# ------ Debug Start ------
# import ipdb;ipdb.set_trace()
# ------ Debug End ------
point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True)
cam_params = DataLoadUtil.load_cam_info(path, binocular=True)
filtered_point_cloud = ReconstructionUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=filter_degree)
''' TODO: old code: filter_points api is changed, need to update the code '''
filtered_point_cloud = PtsUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=filter_degree)
full_scene_point_cloud = None
if require_full_scene:
depth_L, depth_R = DataLoadUtil.load_depth(path, cam_params['near_plane'], cam_params['far_plane'], binocular=True)
@ -47,7 +46,7 @@ class RenderUtil:
point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, 65536)
point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, 65536)
full_scene_point_cloud = DataLoadUtil.get_overlapping_points(point_cloud_L, point_cloud_R)
full_scene_point_cloud = PtsUtil.get_overlapping_points(point_cloud_L, point_cloud_R)
return filtered_point_cloud, full_scene_point_cloud