import torch from torch import nn import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory.component_factory import ComponentFactory from PytorchBoot.utils import Log @stereotype.pipeline("nbv_reconstruction_pipeline") class NBVReconstructionPipeline(nn.Module): def __init__(self, config): super(NBVReconstructionPipeline, self).__init__() self.config = config self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["pts_encoder"]) self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["pose_encoder"]) self.seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["seq_encoder"]) self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, config["view_finder"]) self.eps = 1e-5 def forward(self, data): mode = data["mode"] if mode == namespace.Mode.TRAIN: return self.forward_train(data) elif mode == namespace.Mode.TEST: return self.forward_test(data) else: Log.error("Unknown mode: {}".format(mode), True) def pertube_data(self, gt_delta_9d): bs = gt_delta_9d.shape[0] random_t = torch.rand(bs, device=gt_delta_9d.device) * (1. - self.eps) + self.eps random_t = random_t.unsqueeze(-1) mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t) std = std.view(-1, 1) z = torch.randn_like(gt_delta_9d) perturbed_x = mu + z * std target_score = - z * std / (std ** 2) return perturbed_x, random_t, target_score, std def forward_train(self, data): seq_feat = self.get_seq_feat(data) ''' get std ''' best_to_1_pose_9d_batch = data["best_to_1_pose_9d"] perturbed_x, random_t, target_score, std = self.pertube_data(best_to_1_pose_9d_batch) input_data = { "sampled_pose": perturbed_x, "t": random_t, "seq_feat": seq_feat, } estimated_score = self.view_finder(input_data) output = { "estimated_score": estimated_score, "target_score": target_score, "std": std } return output def forward_test(self,data): seq_feat = self.get_seq_feat(data) estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view(seq_feat) result = { "pred_pose_9d": estimated_delta_rot_9d, "in_process_sample": in_process_sample } return result def get_seq_feat(self, data): scanned_pts_batch = data['scanned_pts'] scanned_n_to_1_pose_9d_batch = data['scanned_n_to_1_pose_9d'] pts_feat_seq_list = [] pose_feat_seq_list = [] device = next(self.parameters()).device for scanned_pts,scanned_n_to_1_pose_9d in zip(scanned_pts_batch,scanned_n_to_1_pose_9d_batch): scanned_pts = scanned_pts.to(device) scanned_n_to_1_pose_9d = scanned_n_to_1_pose_9d.to(device) pts_feat_seq_list.append(self.pts_encoder.encode_points(scanned_pts)) pose_feat_seq_list.append(self.pose_encoder.encode_pose(scanned_n_to_1_pose_9d)) seq_feat = self.seq_encoder.encode_sequence(pts_feat_seq_list, pose_feat_seq_list) if torch.isnan(seq_feat).any(): Log.error("nan in seq_feat", True) return seq_feat