import torch from torch import nn import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory.component_factory import ComponentFactory from PytorchBoot.utils import Log @stereotype.pipeline("nbv_reconstruction_pipeline_local") class NBVReconstructionLocalPointsPipeline(nn.Module): def __init__(self, config): super(NBVReconstructionLocalPointsPipeline, self).__init__() self.config = config self.module_config = config["modules"] self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"]) self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"]) self.seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["seq_encoder"]) self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"]) self.eps = float(self.config["eps"]) self.enable_global_scanned_feat = self.config["global_scanned_feat"] def forward(self, data): mode = data["mode"] if mode == namespace.Mode.TRAIN: return self.forward_train(data) elif mode == namespace.Mode.TEST: return self.forward_test(data) else: Log.error("Unknown mode: {}".format(mode), True) def pertube_data(self, gt_delta_9d): bs = gt_delta_9d.shape[0] random_t = torch.rand(bs, device=gt_delta_9d.device) * (1. - self.eps) + self.eps random_t = random_t.unsqueeze(-1) mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t) std = std.view(-1, 1) z = torch.randn_like(gt_delta_9d) perturbed_x = mu + z * std target_score = - z * std / (std ** 2) return perturbed_x, random_t, target_score, std def forward_train(self, data): main_feat = self.get_main_feat(data) ''' get std ''' best_to_world_pose_9d_batch = data["best_to_world_pose_9d"] perturbed_x, random_t, target_score, std = self.pertube_data(best_to_world_pose_9d_batch) input_data = { "sampled_pose": perturbed_x, "t": random_t, "main_feat": main_feat, } estimated_score = self.view_finder(input_data) output = { "estimated_score": estimated_score, "target_score": target_score, "std": std } return output def forward_test(self,data): main_feat = self.get_main_feat(data) repeat_num = data.get("repeat_num", 50) main_feat = main_feat.repeat(repeat_num, 1) estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view( main_feat ) result = { "pred_pose_9d": estimated_delta_rot_9d, "in_process_sample": in_process_sample, } return result def get_main_feat(self, data): scanned_pts_batch = data['scanned_pts'] scanned_n_to_world_pose_9d_batch = data['scanned_n_to_world_pose_9d'] device = next(self.parameters()).device feat_seq_list = [] for scanned_pts,scanned_n_to_world_pose_9d in zip(scanned_pts_batch,scanned_n_to_world_pose_9d_batch): scanned_pts = scanned_pts.to(device) scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device) pts_feat = self.pts_encoder.encode_points(scanned_pts) pose_feat = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d) seq_feat = torch.cat([pts_feat, pose_feat], dim=-1) feat_seq_list.append(seq_feat) main_feat = self.seq_encoder.encode_sequence(feat_seq_list) if self.enable_global_scanned_feat: combined_scanned_pts_batch = data['combined_scanned_pts'] global_scanned_feat = self.pts_encoder.encode_points(combined_scanned_pts_batch) main_feat = torch.cat([main_feat, global_scanned_feat], dim=-1) if torch.isnan(main_feat).any(): Log.error("nan in main_feat", True) return main_feat