import os import json from PytorchBoot.runners.runner import Runner from PytorchBoot.config import ConfigManager import PytorchBoot.stereotype as stereotype from utils.data_load import DataLoadUtil from utils.reconstruction import ReconstructionUtil @stereotype.runner("strategy_generator", comment="unfinished") class StrategyGenerator(Runner): def __init__(self, config): super().__init__(config) self.load_experiment("generate") def run(self): dataset_name_list = ConfigManager.get("runner", "dataset_list") voxel_threshold, overlap_threshold = ConfigManager.get("runner","generate","voxel_threshold"), ConfigManager.get("runner","generate","overlap_threshold") for dataset_name in dataset_name_list: root_dir = ConfigManager.get("datasets", dataset_name, "root_dir") output_dir = ConfigManager.get("datasets", dataset_name, "output_dir") if not os.path.exists(output_dir): os.makedirs(output_dir) scene_idx_list = DataLoadUtil.get_scene_idx_list(root_dir) for scene_idx in scene_idx_list: self.generate_sequence(root_dir, output_dir, scene_idx,voxel_threshold, overlap_threshold) def create_experiment(self, backup_name=None): super().create_experiment(backup_name) output_dir = os.path.join(str(self.experiment_path), "output") os.makedirs(output_dir) def load_experiment(self, backup_name=None): super().load_experiment(backup_name) def generate_sequence(self,root, output_dir, seq, voxel_threshold, overlap_threshold): frame_idx_list = DataLoadUtil.get_frame_idx_list(root, seq) model_pts = DataLoadUtil.load_model_points(root, seq) pts_list = [] for frame_idx in frame_idx_list: path = DataLoadUtil.get_path(root, seq, frame_idx) point_cloud = DataLoadUtil.get_point_cloud_world_from_path(path) sampled_point_cloud = ReconstructionUtil.downsample_point_cloud(point_cloud, voxel_threshold) pts_list.append(sampled_point_cloud) limited_useful_view, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(model_pts, pts_list, threshold=voxel_threshold, overlap_threshold=overlap_threshold) data_pairs = self.generate_data_pairs(limited_useful_view) seq_save_data = { "data_pairs": data_pairs, "best_sequence": limited_useful_view, "max_coverage_rate": limited_useful_view[-1][1] } output_label_path = DataLoadUtil.get_label_path(output_dir, seq) with open(output_label_path, 'w') as f: json.dump(seq_save_data, f) def generate_data_pairs(self, useful_view): data_pairs = [] for next_view_idx in range(len(useful_view)): scanned_views = useful_view[:next_view_idx] next_view = useful_view[next_view_idx] data_pairs.append((scanned_views, next_view)) return data_pairs