import torch from torch import nn import PytorchBoot.namespace as namespace import PytorchBoot.stereotype as stereotype from PytorchBoot.factory.component_factory import ComponentFactory from PytorchBoot.utils import Log @stereotype.pipeline("nbv_reconstruction_pipeline") class NBVReconstructionPipeline(nn.Module): def __init__(self, config): super(NBVReconstructionPipeline, self).__init__() self.config = config self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["pts_encoder"]) self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["pose_encoder"]) self.seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, config["seq_encoder"]) self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, config["view_finder"]) def forward(self, data): mode = data["mode"] if mode == namespace.Mode.TRAIN: return self.forward_train(data) elif mode == namespace.Mode.TEST: return self.forward_test(data) else: Log.error("Unknown mode: {}".format(mode), True) def pertube_data(self, gt_delta_rot_6d): bs = gt_delta_rot_6d.shape[0] random_t = torch.rand(bs, device=self.device) * (1. - self.eps) + self.eps random_t = random_t.unsqueeze(-1) mu, std = self.view_finder.marginal_prob(gt_delta_rot_6d, random_t) std = std.view(-1, 1) z = torch.randn_like(gt_delta_rot_6d) perturbed_x = mu + z * std target_score = - z * std / (std ** 2) return perturbed_x, random_t, target_score, std def forward_train(self, data): pts_list = data['pts_list'] pose_list = data['pose_list'] gt_delta_rot_6d = data["delta_rot_6d"] pts_feat_list = [] pose_feat_list = [] for pts,pose in zip(pts_list,pose_list): pts_feat_list.append(self.pts_encoder.encode_points(pts)) pose_feat_list.append(self.pose_encoder.encode_pose(pose)) seq_feat = self.seq_encoder.encode_sequence(pts_feat_list, pose_feat_list) ''' get std ''' perturbed_x, random_t, target_score, std = self.pertube_data(gt_delta_rot_6d) input_data = { "sampled_pose": perturbed_x, "t": random_t, "seq_feat": seq_feat, } estimated_score = self.view_finder(input_data) output = { "estimated_score": estimated_score, "target_score": target_score, "std": std } return output def forward_test(self,data): pts_list = data['pts_list'] pose_list = data['pose_list'] pts_feat_list = [] pose_feat_list = [] for pts,pose in zip(pts_list,pose_list): pts_feat_list.append(self.pts_encoder.encode_points(pts)) pose_feat_list.append(self.pose_encoder.encode_pose(pose)) seq_feat = self.seq_encoder.encode_sequence(pts_feat_list, pose_feat_list) estimated_delta_rot_6d, in_process_sample = self.view_finder.next_best_view(seq_feat) result = { "estimated_delta_rot_6d": estimated_delta_rot_6d, "in_process_sample": in_process_sample } return result