add close loop control

This commit is contained in:
hofee 2024-10-12 23:11:25 +08:00
parent 8d43d4de60
commit 07dcdb3452
4 changed files with 294 additions and 2 deletions

View File

@ -0,0 +1,46 @@
runner:
general:
seed: 1
device: cpu
cuda_visible_devices: "0,1,2,3,4,5,6,7"
experiment:
name: debug
root_dir: "experiments"
generate:
blender_bin_path: /home/yan20/Desktop/nbv_rec/project/blender_app/blender-4.2.2-linux-x64/blender
generator_script_path: /home/yan20/Desktop/nbv_rec/project/blender_app/data_generator.py
model_dir: "/home/yan20/Desktop/nbv_rec/data/models"
table_model_path: "/home/yan20/Desktop/nbv_rec/data/table.obj"
model_start_idx: 0
voxel_size: 0.002
max_shot_view_num: 50
min_shot_new_pts_num: 10
min_coverage_increase: 0.001
max_view: 512
min_view: 128
max_diag: 0.7
min_diag: 0.01
random_view_ratio: 0
min_cam_table_included_degree: 20
obj_name: "bear"
light_and_camera_config:
Camera:
near_plane: 0.01
far_plane: 5
fov_vertical: 25
resolution: [640,400]
eye_distance: 0.15
eye_angle: 25
Light:
location: [0,0,3.5]
orientation: [0,0,0]
power: 150
reconstruct:
soft_overlap_threshold: 0.3
hard_overlap_threshold: 0.6
scan_points_threshold: 10

View File

@ -0,0 +1,219 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data, save_scene_data_multithread
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
@stereotype.runner("CAD_close_loop_strategy_runner")
class CADCloseLoopStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_strategy")
self.status_info = {
"status_manager": status_manager,
"app_name": "cad",
"runner_name": "CAD_close_loop_strategy_runner",
}
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.model_dir = self.generate_config["model_dir"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_view = self.generate_config["max_view"]
self.min_view = self.generate_config["min_view"]
self.max_diag = self.generate_config["max_diag"]
self.min_diag = self.generate_config["min_diag"]
self.min_cam_table_included_degree = self.generate_config[
"min_cam_table_included_degree"
]
self.max_shot_view_num = self.generate_config["max_shot_view_num"]
self.min_shot_new_pts_num = self.generate_config["max_shot_new_pts_num"]
self.min_coverage_increase = self.generate_config["min_coverage_increase"]
self.random_view_ratio = self.generate_config["random_view_ratio"]
self.soft_overlap_threshold = self.reconstruct_config["soft_overlap_threshold"]
self.hard_overlap_threshold = self.reconstruct_config["hard_overlap_threshold"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def run_one_model(self, model_name):
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
ControlUtil.connect_robot()
""" init robot """
Log.info("[Part 1/5] start init and register")
ControlUtil.init()
""" load CAD model """
model_path = os.path.join(self.model_dir, model_name, "mesh.ply")
temp_name = "cad_model_world"
cad_model = trimesh.load(model_path)
""" take first view """
Log.info("[Part 1/5] take first view data")
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_pts = ViewUtil.get_pts(view_data)
first_cam_to_real_world = ControlUtil.get_pose()
first_real_world_pts = PtsUtil.transform_point_cloud(
first_cam_pts, first_cam_to_real_world
)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
first_real_world_pts
)
np.savetxt(f"first_real_pts_{model_name}.txt", first_splitted_real_world_pts)
""" register """
Log.info("[Part 1/5] do registeration")
real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("[Part 1/5] finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world: trimesh.Trimesh = cad_model.apply_transform(
cad_to_real_world
)
cad_model_real_world.export(
os.path.join(temp_dir, f"real_world_{temp_name}.obj")
)
cad_model_blender_world: trimesh.Trimesh = cad_model.apply_transform(
real_world_to_blender_world
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
cad_model_blender_world.export(os.path.join(temp_dir, f"{temp_name}.obj"))
""" sample view """
Log.info("[Part 2/5] start running renderer")
subprocess.run(
[
self.blender_bin_path,
"-b",
"-P",
self.generator_script_path,
"--",
temp_dir,
],
capture_output=True,
text=True,
)
Log.success("[Part 2/5] finish running renderer")
""" preprocess """
Log.info("[Part 3/5] start preprocessing data")
save_scene_data(temp_dir, temp_name)
Log.success("[Part 3/5] finish preprocessing data")
pts_dir = os.path.join(temp_dir, temp_name, "pts")
sample_view_pts_list = []
scan_points_idx_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(temp_dir, temp_name, "pts", f"{frame_idx}.txt")
idx_path = os.path.join(
temp_dir, temp_name, "scan_points_indices", f"{frame_idx}.npy"
)
point_cloud = np.loadtxt(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(
point_cloud, self.voxel_size
)
indices = np.load(idx_path)
try:
len(indices)
except:
indices = np.array([indices])
sample_view_pts_list.append(sampled_point_cloud)
scan_points_idx_list.append(indices)
""" close-loop strategy """
scanned_pts = [first_real_world_pts]
history_indices = []
last_coverage = 0
last_covered_num = 0
Log.info("[Part 4/4] start close-loop control")
while True:
next_best_view, next_best_coverage, next_best_covered_num = (
ReconstructionUtil.compute_next_best_view_with_overlap(
scanned_pts,
sample_view_pts_list,
history_indices,
scan_points_idx_list,
threshold=self.voxel_size,
soft_overlap_threshold=self.soft_overlap_threshold,
hard_overlap_threshold=self.hard_overlap_threshold,
scan_points_threshold=self.scan_points_threshold,
)
)
nbv_path = DataLoadUtil.get_path(temp_dir, temp_name, next_best_view)
nbv_cam_info = DataLoadUtil.load_cam_info(nbv_path, binocular=True)
nbv_cam_to_world = nbv_cam_info["cam_to_world_O"]
ControlUtil.move_to(nbv_cam_to_world)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
world_shot_pts = ViewUtil.get_pts(view_data)
scanned_pts.append(world_shot_pts)
history_indices.append(scan_points_idx_list[next_best_view])
Log.info(
f"Current rec pts num: {len(scanned_pts)}, Best cover pts: {next_best_covered_num}, Best coverage: {next_best_coverage}"
)
coverage_rate_increase = next_best_coverage - last_coverage
if coverage_rate_increase < self.min_coverage_increase:
Log.info(f"Coverage rate = {coverage_rate_increase} < {self.min_coverage_increase}, stop scanning")
break
last_coverage = next_best_coverage
new_added_pts_num = next_best_covered_num - last_covered_num
if new_added_pts_num < self.min_shot_new_pts_num:
Log.info(f"New added pts num = {new_added_pts_num} < {self.min_shot_new_pts_num}, stop scanning")
break
last_covered_num = next_best_covered_num
if len(scanned_pts) >= self.max_shot_view_num:
Log.info(f"Scanned view num = {len(scanned_pts)} >= {self.max_shot_view_num}, stop scanning")
break
Log.success("[Part 4/4] finish close-loop control")
def run(self):
total = len(os.listdir(self.model_dir))
model_start_idx = self.generate_config["model_start_idx"]
count_object = model_start_idx
for model_name in os.listdir(self.model_dir[model_start_idx:]):
Log.info(f"[{count_object}/{total}]Processing {model_name}")
self.run_one_model(model_name)
Log.success(f"[{count_object}/{total}]Finished processing {model_name}")
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"C:\Users\hofee\Downloads\mesh.obj"
model = trimesh.load(model_path)

View File

@ -28,7 +28,7 @@ class CADStrategyRunner(Runner):
self.status_info = {
"status_manager": status_manager,
"app_name": "cad",
"runner_name": "cad_strategy"
"runner_name": "CAD_strategy_runner"
}
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")

View File

@ -121,7 +121,34 @@ class ReconstructionUtil:
sm.set_progress(app_name, runner_name, "processed view", len(point_cloud_list), len(point_cloud_list))
return view_sequence, remaining_views, combined_point_cloud
@staticmethod
def compute_next_best_view_with_overlap(scanned_pts:list, point_cloud_list, history_indices, scan_points_indices_list, threshold=0.01, soft_overlap_threshold=0.5, hard_overlap_threshold=0.7, scan_points_threshold=5):
max_rec_pts = np.vstack(point_cloud_list)
downsampled_max_rec_pts = PtsUtil.voxel_downsample_point_cloud(max_rec_pts, threshold)
best_view = None
best_coverage = -1
best_covered_num = 0
for view in range(len(point_cloud_list)):
if point_cloud_list[view].shape[0] == 0:
continue
new_scan_points_indices = scan_points_indices_list[view]
if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold):
overlap_threshold = hard_overlap_threshold
else:
overlap_threshold = soft_overlap_threshold
overlap_rate = ReconstructionUtil.compute_overlap_rate(point_cloud_list[view], scanned_pts, threshold)
if overlap_rate < overlap_threshold:
continue
new_combined_point_cloud = np.vstack(scanned_pts + [point_cloud_list[view]])
new_downsampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(new_combined_point_cloud,threshold)
new_coverage, new_covered_num = ReconstructionUtil.compute_coverage_rate(downsampled_max_rec_pts, new_downsampled_combined_point_cloud, threshold)
if new_coverage > best_coverage:
best_coverage = new_coverage
best_covered_num = new_covered_num
best_view = view
return best_view, best_coverage, best_covered_num
@staticmethod
def generate_scan_points(display_table_top, display_table_radius, min_distance=0.03, max_points_num = 500, max_attempts = 1000):
points = []